PyXR

c:\python24\lib \ idlelib \ rpc.py



0001 """RPC Implemention, originally written for the Python Idle IDE
0002 
0003 For security reasons, GvR requested that Idle's Python execution server process
0004 connect to the Idle process, which listens for the connection.  Since Idle has
0005 has only one client per server, this was not a limitation.
0006 
0007    +---------------------------------+ +-------------+
0008    | SocketServer.BaseRequestHandler | | SocketIO    |
0009    +---------------------------------+ +-------------+
0010                    ^                   | register()  |
0011                    |                   | unregister()|
0012                    |                   +-------------+
0013                    |                      ^  ^
0014                    |                      |  |
0015                    | + -------------------+  |
0016                    | |                       |
0017    +-------------------------+        +-----------------+
0018    | RPCHandler              |        | RPCClient       |
0019    | [attribute of RPCServer]|        |                 |
0020    +-------------------------+        +-----------------+
0021 
0022 The RPCServer handler class is expected to provide register/unregister methods.
0023 RPCHandler inherits the mix-in class SocketIO, which provides these methods.
0024 
0025 See the Idle run.main() docstring for further information on how this was
0026 accomplished in Idle.
0027 
0028 """
0029 
0030 import sys
0031 import os
0032 import socket
0033 import select
0034 import SocketServer
0035 import struct
0036 import cPickle as pickle
0037 import threading
0038 import Queue
0039 import traceback
0040 import copy_reg
0041 import types
0042 import marshal
0043 
0044 
0045 def unpickle_code(ms):
0046     co = marshal.loads(ms)
0047     assert isinstance(co, types.CodeType)
0048     return co
0049 
0050 def pickle_code(co):
0051     assert isinstance(co, types.CodeType)
0052     ms = marshal.dumps(co)
0053     return unpickle_code, (ms,)
0054 
0055 # XXX KBK 24Aug02 function pickling capability not used in Idle
0056 #  def unpickle_function(ms):
0057 #      return ms
0058 
0059 #  def pickle_function(fn):
0060 #      assert isinstance(fn, type.FunctionType)
0061 #      return repr(fn)
0062 
0063 copy_reg.pickle(types.CodeType, pickle_code, unpickle_code)
0064 # copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function)
0065 
0066 BUFSIZE = 8*1024
0067 LOCALHOST = '127.0.0.1'
0068 
0069 class RPCServer(SocketServer.TCPServer):
0070 
0071     def __init__(self, addr, handlerclass=None):
0072         if handlerclass is None:
0073             handlerclass = RPCHandler
0074         SocketServer.TCPServer.__init__(self, addr, handlerclass)
0075 
0076     def server_bind(self):
0077         "Override TCPServer method, no bind() phase for connecting entity"
0078         pass
0079 
0080     def server_activate(self):
0081         """Override TCPServer method, connect() instead of listen()
0082 
0083         Due to the reversed connection, self.server_address is actually the
0084         address of the Idle Client to which we are connecting.
0085 
0086         """
0087         self.socket.connect(self.server_address)
0088 
0089     def get_request(self):
0090         "Override TCPServer method, return already connected socket"
0091         return self.socket, self.server_address
0092 
0093     def handle_error(self, request, client_address):
0094         """Override TCPServer method
0095 
0096         Error message goes to __stderr__.  No error message if exiting
0097         normally or socket raised EOF.  Other exceptions not handled in
0098         server code will cause os._exit.
0099 
0100         """
0101         try:
0102             raise
0103         except SystemExit:
0104             raise
0105         except:
0106             erf = sys.__stderr__
0107             print>>erf, '\n' + '-'*40
0108             print>>erf, 'Unhandled server exception!'
0109             print>>erf, 'Thread: %s' % threading.currentThread().getName()
0110             print>>erf, 'Client Address: ', client_address
0111             print>>erf, 'Request: ', repr(request)
0112             traceback.print_exc(file=erf)
0113             print>>erf, '\n*** Unrecoverable, server exiting!'
0114             print>>erf, '-'*40
0115             os._exit(0)
0116 
0117 #----------------- end class RPCServer --------------------
0118 
0119 objecttable = {}
0120 request_queue = Queue.Queue(0)
0121 response_queue = Queue.Queue(0)
0122 
0123 
0124 class SocketIO:
0125 
0126     nextseq = 0
0127 
0128     def __init__(self, sock, objtable=None, debugging=None):
0129         self.sockthread = threading.currentThread()
0130         if debugging is not None:
0131             self.debugging = debugging
0132         self.sock = sock
0133         if objtable is None:
0134             objtable = objecttable
0135         self.objtable = objtable
0136         self.responses = {}
0137         self.cvars = {}
0138 
0139     def close(self):
0140         sock = self.sock
0141         self.sock = None
0142         if sock is not None:
0143             sock.close()
0144 
0145     def exithook(self):
0146         "override for specific exit action"
0147         os._exit()
0148 
0149     def debug(self, *args):
0150         if not self.debugging:
0151             return
0152         s = self.location + " " + str(threading.currentThread().getName())
0153         for a in args:
0154             s = s + " " + str(a)
0155         print>>sys.__stderr__, s
0156 
0157     def register(self, oid, object):
0158         self.objtable[oid] = object
0159 
0160     def unregister(self, oid):
0161         try:
0162             del self.objtable[oid]
0163         except KeyError:
0164             pass
0165 
0166     def localcall(self, seq, request):
0167         self.debug("localcall:", request)
0168         try:
0169             how, (oid, methodname, args, kwargs) = request
0170         except TypeError:
0171             return ("ERROR", "Bad request format")
0172         if not self.objtable.has_key(oid):
0173             return ("ERROR", "Unknown object id: %r" % (oid,))
0174         obj = self.objtable[oid]
0175         if methodname == "__methods__":
0176             methods = {}
0177             _getmethods(obj, methods)
0178             return ("OK", methods)
0179         if methodname == "__attributes__":
0180             attributes = {}
0181             _getattributes(obj, attributes)
0182             return ("OK", attributes)
0183         if not hasattr(obj, methodname):
0184             return ("ERROR", "Unsupported method name: %r" % (methodname,))
0185         method = getattr(obj, methodname)
0186         try:
0187             if how == 'CALL':
0188                 ret = method(*args, **kwargs)
0189                 if isinstance(ret, RemoteObject):
0190                     ret = remoteref(ret)
0191                 return ("OK", ret)
0192             elif how == 'QUEUE':
0193                 request_queue.put((seq, (method, args, kwargs)))
0194                 return("QUEUED", None)
0195             else:
0196                 return ("ERROR", "Unsupported message type: %s" % how)
0197         except SystemExit:
0198             raise
0199         except socket.error:
0200             raise
0201         except:
0202             self.debug("localcall:EXCEPTION")
0203             traceback.print_exc(file=sys.__stderr__)
0204             return ("EXCEPTION", None)
0205 
0206     def remotecall(self, oid, methodname, args, kwargs):
0207         self.debug("remotecall:asynccall: ", oid, methodname)
0208         seq = self.asynccall(oid, methodname, args, kwargs)
0209         return self.asyncreturn(seq)
0210 
0211     def remotequeue(self, oid, methodname, args, kwargs):
0212         self.debug("remotequeue:asyncqueue: ", oid, methodname)
0213         seq = self.asyncqueue(oid, methodname, args, kwargs)
0214         return self.asyncreturn(seq)
0215 
0216     def asynccall(self, oid, methodname, args, kwargs):
0217         request = ("CALL", (oid, methodname, args, kwargs))
0218         seq = self.newseq()
0219         if threading.currentThread() != self.sockthread:
0220             cvar = threading.Condition()
0221             self.cvars[seq] = cvar
0222         self.debug(("asynccall:%d:" % seq), oid, methodname, args, kwargs)
0223         self.putmessage((seq, request))
0224         return seq
0225 
0226     def asyncqueue(self, oid, methodname, args, kwargs):
0227         request = ("QUEUE", (oid, methodname, args, kwargs))
0228         seq = self.newseq()
0229         if threading.currentThread() != self.sockthread:
0230             cvar = threading.Condition()
0231             self.cvars[seq] = cvar
0232         self.debug(("asyncqueue:%d:" % seq), oid, methodname, args, kwargs)
0233         self.putmessage((seq, request))
0234         return seq
0235 
0236     def asyncreturn(self, seq):
0237         self.debug("asyncreturn:%d:call getresponse(): " % seq)
0238         response = self.getresponse(seq, wait=0.05)
0239         self.debug(("asyncreturn:%d:response: " % seq), response)
0240         return self.decoderesponse(response)
0241 
0242     def decoderesponse(self, response):
0243         how, what = response
0244         if how == "OK":
0245             return what
0246         if how == "QUEUED":
0247             return None
0248         if how == "EXCEPTION":
0249             self.debug("decoderesponse: EXCEPTION")
0250             return None
0251         if how == "EOF":
0252             self.debug("decoderesponse: EOF")
0253             self.decode_interrupthook()
0254             return None
0255         if how == "ERROR":
0256             self.debug("decoderesponse: Internal ERROR:", what)
0257             raise RuntimeError, what
0258         raise SystemError, (how, what)
0259 
0260     def decode_interrupthook(self):
0261         ""
0262         raise EOFError
0263 
0264     def mainloop(self):
0265         """Listen on socket until I/O not ready or EOF
0266 
0267         pollresponse() will loop looking for seq number None, which
0268         never comes, and exit on EOFError.
0269 
0270         """
0271         try:
0272             self.getresponse(myseq=None, wait=0.05)
0273         except EOFError:
0274             self.debug("mainloop:return")
0275             return
0276 
0277     def getresponse(self, myseq, wait):
0278         response = self._getresponse(myseq, wait)
0279         if response is not None:
0280             how, what = response
0281             if how == "OK":
0282                 response = how, self._proxify(what)
0283         return response
0284 
0285     def _proxify(self, obj):
0286         if isinstance(obj, RemoteProxy):
0287             return RPCProxy(self, obj.oid)
0288         if isinstance(obj, types.ListType):
0289             return map(self._proxify, obj)
0290         # XXX Check for other types -- not currently needed
0291         return obj
0292 
0293     def _getresponse(self, myseq, wait):
0294         self.debug("_getresponse:myseq:", myseq)
0295         if threading.currentThread() is self.sockthread:
0296             # this thread does all reading of requests or responses
0297             while 1:
0298                 response = self.pollresponse(myseq, wait)
0299                 if response is not None:
0300                     return response
0301         else:
0302             # wait for notification from socket handling thread
0303             cvar = self.cvars[myseq]
0304             cvar.acquire()
0305             while not self.responses.has_key(myseq):
0306                 cvar.wait()
0307             response = self.responses[myseq]
0308             self.debug("_getresponse:%s: thread woke up: response: %s" %
0309                        (myseq, response))
0310             del self.responses[myseq]
0311             del self.cvars[myseq]
0312             cvar.release()
0313             return response
0314 
0315     def newseq(self):
0316         self.nextseq = seq = self.nextseq + 2
0317         return seq
0318 
0319     def putmessage(self, message):
0320         self.debug("putmessage:%d:" % message[0])
0321         try:
0322             s = pickle.dumps(message)
0323         except pickle.PicklingError:
0324             print >>sys.__stderr__, "Cannot pickle:", repr(message)
0325             raise
0326         s = struct.pack("<i", len(s)) + s
0327         while len(s) > 0:
0328             try:
0329                 r, w, x = select.select([], [self.sock], [])
0330                 n = self.sock.send(s[:BUFSIZE])
0331             except (AttributeError, socket.error):
0332                 # socket was closed
0333                 raise IOError
0334             else:
0335                 s = s[n:]
0336 
0337     buffer = ""
0338     bufneed = 4
0339     bufstate = 0 # meaning: 0 => reading count; 1 => reading data
0340 
0341     def pollpacket(self, wait):
0342         self._stage0()
0343         if len(self.buffer) < self.bufneed:
0344             r, w, x = select.select([self.sock.fileno()], [], [], wait)
0345             if len(r) == 0:
0346                 return None
0347             try:
0348                 s = self.sock.recv(BUFSIZE)
0349             except socket.error:
0350                 raise EOFError
0351             if len(s) == 0:
0352                 raise EOFError
0353             self.buffer += s
0354             self._stage0()
0355         return self._stage1()
0356 
0357     def _stage0(self):
0358         if self.bufstate == 0 and len(self.buffer) >= 4:
0359             s = self.buffer[:4]
0360             self.buffer = self.buffer[4:]
0361             self.bufneed = struct.unpack("<i", s)[0]
0362             self.bufstate = 1
0363 
0364     def _stage1(self):
0365         if self.bufstate == 1 and len(self.buffer) >= self.bufneed:
0366             packet = self.buffer[:self.bufneed]
0367             self.buffer = self.buffer[self.bufneed:]
0368             self.bufneed = 4
0369             self.bufstate = 0
0370             return packet
0371 
0372     def pollmessage(self, wait):
0373         packet = self.pollpacket(wait)
0374         if packet is None:
0375             return None
0376         try:
0377             message = pickle.loads(packet)
0378         except pickle.UnpicklingError:
0379             print >>sys.__stderr__, "-----------------------"
0380             print >>sys.__stderr__, "cannot unpickle packet:", repr(packet)
0381             traceback.print_stack(file=sys.__stderr__)
0382             print >>sys.__stderr__, "-----------------------"
0383             raise
0384         return message
0385 
0386     def pollresponse(self, myseq, wait):
0387         """Handle messages received on the socket.
0388 
0389         Some messages received may be asynchronous 'call' or 'queue' requests,
0390         and some may be responses for other threads.
0391 
0392         'call' requests are passed to self.localcall() with the expectation of
0393         immediate execution, during which time the socket is not serviced.
0394 
0395         'queue' requests are used for tasks (which may block or hang) to be
0396         processed in a different thread.  These requests are fed into
0397         request_queue by self.localcall().  Responses to queued requests are
0398         taken from response_queue and sent across the link with the associated
0399         sequence numbers.  Messages in the queues are (sequence_number,
0400         request/response) tuples and code using this module removing messages
0401         from the request_queue is responsible for returning the correct
0402         sequence number in the response_queue.
0403 
0404         pollresponse() will loop until a response message with the myseq
0405         sequence number is received, and will save other responses in
0406         self.responses and notify the owning thread.
0407 
0408         """
0409         while 1:
0410             # send queued response if there is one available
0411             try:
0412                 qmsg = response_queue.get(0)
0413             except Queue.Empty:
0414                 pass
0415             else:
0416                 seq, response = qmsg
0417                 message = (seq, ('OK', response))
0418                 self.putmessage(message)
0419             # poll for message on link
0420             try:
0421                 message = self.pollmessage(wait)
0422                 if message is None:  # socket not ready
0423                     return None
0424             except EOFError:
0425                 self.handle_EOF()
0426                 return None
0427             except AttributeError:
0428                 return None
0429             seq, resq = message
0430             how = resq[0]
0431             self.debug("pollresponse:%d:myseq:%s" % (seq, myseq))
0432             # process or queue a request
0433             if how in ("CALL", "QUEUE"):
0434                 self.debug("pollresponse:%d:localcall:call:" % seq)
0435                 response = self.localcall(seq, resq)
0436                 self.debug("pollresponse:%d:localcall:response:%s"
0437                            % (seq, response))
0438                 if how == "CALL":
0439                     self.putmessage((seq, response))
0440                 elif how == "QUEUE":
0441                     # don't acknowledge the 'queue' request!
0442                     pass
0443                 continue
0444             # return if completed message transaction
0445             elif seq == myseq:
0446                 return resq
0447             # must be a response for a different thread:
0448             else:
0449                 cv = self.cvars.get(seq, None)
0450                 # response involving unknown sequence number is discarded,
0451                 # probably intended for prior incarnation of server
0452                 if cv is not None:
0453                     cv.acquire()
0454                     self.responses[seq] = resq
0455                     cv.notify()
0456                     cv.release()
0457                 continue
0458 
0459     def handle_EOF(self):
0460         "action taken upon link being closed by peer"
0461         self.EOFhook()
0462         self.debug("handle_EOF")
0463         for key in self.cvars:
0464             cv = self.cvars[key]
0465             cv.acquire()
0466             self.responses[key] = ('EOF', None)
0467             cv.notify()
0468             cv.release()
0469         # call our (possibly overridden) exit function
0470         self.exithook()
0471 
0472     def EOFhook(self):
0473         "Classes using rpc client/server can override to augment EOF action"
0474         pass
0475 
0476 #----------------- end class SocketIO --------------------
0477 
0478 class RemoteObject:
0479     # Token mix-in class
0480     pass
0481 
0482 def remoteref(obj):
0483     oid = id(obj)
0484     objecttable[oid] = obj
0485     return RemoteProxy(oid)
0486 
0487 class RemoteProxy:
0488 
0489     def __init__(self, oid):
0490         self.oid = oid
0491 
0492 class RPCHandler(SocketServer.BaseRequestHandler, SocketIO):
0493 
0494     debugging = False
0495     location = "#S"  # Server
0496 
0497     def __init__(self, sock, addr, svr):
0498         svr.current_handler = self ## cgt xxx
0499         SocketIO.__init__(self, sock)
0500         SocketServer.BaseRequestHandler.__init__(self, sock, addr, svr)
0501 
0502     def handle(self):
0503         "handle() method required by SocketServer"
0504         self.mainloop()
0505 
0506     def get_remote_proxy(self, oid):
0507         return RPCProxy(self, oid)
0508 
0509 class RPCClient(SocketIO):
0510 
0511     debugging = False
0512     location = "#C"  # Client
0513 
0514     nextseq = 1 # Requests coming from the client are odd numbered
0515 
0516     def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM):
0517         self.listening_sock = socket.socket(family, type)
0518         self.listening_sock.setsockopt(socket.SOL_SOCKET,
0519                                        socket.SO_REUSEADDR, 1)
0520         self.listening_sock.bind(address)
0521         self.listening_sock.listen(1)
0522 
0523     def accept(self):
0524         working_sock, address = self.listening_sock.accept()
0525         if self.debugging:
0526             print>>sys.__stderr__, "****** Connection request from ", address
0527         if address[0] == LOCALHOST:
0528             SocketIO.__init__(self, working_sock)
0529         else:
0530             print>>sys.__stderr__, "** Invalid host: ", address
0531             raise socket.error
0532 
0533     def get_remote_proxy(self, oid):
0534         return RPCProxy(self, oid)
0535 
0536 class RPCProxy:
0537 
0538     __methods = None
0539     __attributes = None
0540 
0541     def __init__(self, sockio, oid):
0542         self.sockio = sockio
0543         self.oid = oid
0544 
0545     def __getattr__(self, name):
0546         if self.__methods is None:
0547             self.__getmethods()
0548         if self.__methods.get(name):
0549             return MethodProxy(self.sockio, self.oid, name)
0550         if self.__attributes is None:
0551             self.__getattributes()
0552         if not self.__attributes.has_key(name):
0553             raise AttributeError, name
0554 
0555     def __getattributes(self):
0556         self.__attributes = self.sockio.remotecall(self.oid,
0557                                                 "__attributes__", (), {})
0558 
0559     def __getmethods(self):
0560         self.__methods = self.sockio.remotecall(self.oid,
0561                                                 "__methods__", (), {})
0562 
0563 def _getmethods(obj, methods):
0564     # Helper to get a list of methods from an object
0565     # Adds names to dictionary argument 'methods'
0566     for name in dir(obj):
0567         attr = getattr(obj, name)
0568         if callable(attr):
0569             methods[name] = 1
0570     if type(obj) == types.InstanceType:
0571         _getmethods(obj.__class__, methods)
0572     if type(obj) == types.ClassType:
0573         for super in obj.__bases__:
0574             _getmethods(super, methods)
0575 
0576 def _getattributes(obj, attributes):
0577     for name in dir(obj):
0578         attr = getattr(obj, name)
0579         if not callable(attr):
0580             attributes[name] = 1
0581 
0582 class MethodProxy:
0583 
0584     def __init__(self, sockio, oid, name):
0585         self.sockio = sockio
0586         self.oid = oid
0587         self.name = name
0588 
0589     def __call__(self, *args, **kwargs):
0590         value = self.sockio.remotecall(self.oid, self.name, args, kwargs)
0591         return value
0592 
0593 
0594 # XXX KBK 09Sep03  We need a proper unit test for this module.  Previously
0595 #                  existing test code was removed at Rev 1.27.
0596 

Generated by PyXR 0.9.4
SourceForge.net Logo