0001 """RPC Implemention, originally written for the Python Idle IDE 0002 0003 For security reasons, GvR requested that Idle's Python execution server process 0004 connect to the Idle process, which listens for the connection. Since Idle has 0005 has only one client per server, this was not a limitation. 0006 0007 +---------------------------------+ +-------------+ 0008 | SocketServer.BaseRequestHandler | | SocketIO | 0009 +---------------------------------+ +-------------+ 0010 ^ | register() | 0011 | | unregister()| 0012 | +-------------+ 0013 | ^ ^ 0014 | | | 0015 | + -------------------+ | 0016 | | | 0017 +-------------------------+ +-----------------+ 0018 | RPCHandler | | RPCClient | 0019 | [attribute of RPCServer]| | | 0020 +-------------------------+ +-----------------+ 0021 0022 The RPCServer handler class is expected to provide register/unregister methods. 0023 RPCHandler inherits the mix-in class SocketIO, which provides these methods. 0024 0025 See the Idle run.main() docstring for further information on how this was 0026 accomplished in Idle. 0027 0028 """ 0029 0030 import sys 0031 import os 0032 import socket 0033 import select 0034 import SocketServer 0035 import struct 0036 import cPickle as pickle 0037 import threading 0038 import Queue 0039 import traceback 0040 import copy_reg 0041 import types 0042 import marshal 0043 0044 0045 def unpickle_code(ms): 0046 co = marshal.loads(ms) 0047 assert isinstance(co, types.CodeType) 0048 return co 0049 0050 def pickle_code(co): 0051 assert isinstance(co, types.CodeType) 0052 ms = marshal.dumps(co) 0053 return unpickle_code, (ms,) 0054 0055 # XXX KBK 24Aug02 function pickling capability not used in Idle 0056 # def unpickle_function(ms): 0057 # return ms 0058 0059 # def pickle_function(fn): 0060 # assert isinstance(fn, type.FunctionType) 0061 # return repr(fn) 0062 0063 copy_reg.pickle(types.CodeType, pickle_code, unpickle_code) 0064 # copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function) 0065 0066 BUFSIZE = 8*1024 0067 LOCALHOST = '127.0.0.1' 0068 0069 class RPCServer(SocketServer.TCPServer): 0070 0071 def __init__(self, addr, handlerclass=None): 0072 if handlerclass is None: 0073 handlerclass = RPCHandler 0074 SocketServer.TCPServer.__init__(self, addr, handlerclass) 0075 0076 def server_bind(self): 0077 "Override TCPServer method, no bind() phase for connecting entity" 0078 pass 0079 0080 def server_activate(self): 0081 """Override TCPServer method, connect() instead of listen() 0082 0083 Due to the reversed connection, self.server_address is actually the 0084 address of the Idle Client to which we are connecting. 0085 0086 """ 0087 self.socket.connect(self.server_address) 0088 0089 def get_request(self): 0090 "Override TCPServer method, return already connected socket" 0091 return self.socket, self.server_address 0092 0093 def handle_error(self, request, client_address): 0094 """Override TCPServer method 0095 0096 Error message goes to __stderr__. No error message if exiting 0097 normally or socket raised EOF. Other exceptions not handled in 0098 server code will cause os._exit. 0099 0100 """ 0101 try: 0102 raise 0103 except SystemExit: 0104 raise 0105 except: 0106 erf = sys.__stderr__ 0107 print>>erf, '\n' + '-'*40 0108 print>>erf, 'Unhandled server exception!' 0109 print>>erf, 'Thread: %s' % threading.currentThread().getName() 0110 print>>erf, 'Client Address: ', client_address 0111 print>>erf, 'Request: ', repr(request) 0112 traceback.print_exc(file=erf) 0113 print>>erf, '\n*** Unrecoverable, server exiting!' 0114 print>>erf, '-'*40 0115 os._exit(0) 0116 0117 #----------------- end class RPCServer -------------------- 0118 0119 objecttable = {} 0120 request_queue = Queue.Queue(0) 0121 response_queue = Queue.Queue(0) 0122 0123 0124 class SocketIO: 0125 0126 nextseq = 0 0127 0128 def __init__(self, sock, objtable=None, debugging=None): 0129 self.sockthread = threading.currentThread() 0130 if debugging is not None: 0131 self.debugging = debugging 0132 self.sock = sock 0133 if objtable is None: 0134 objtable = objecttable 0135 self.objtable = objtable 0136 self.responses = {} 0137 self.cvars = {} 0138 0139 def close(self): 0140 sock = self.sock 0141 self.sock = None 0142 if sock is not None: 0143 sock.close() 0144 0145 def exithook(self): 0146 "override for specific exit action" 0147 os._exit() 0148 0149 def debug(self, *args): 0150 if not self.debugging: 0151 return 0152 s = self.location + " " + str(threading.currentThread().getName()) 0153 for a in args: 0154 s = s + " " + str(a) 0155 print>>sys.__stderr__, s 0156 0157 def register(self, oid, object): 0158 self.objtable[oid] = object 0159 0160 def unregister(self, oid): 0161 try: 0162 del self.objtable[oid] 0163 except KeyError: 0164 pass 0165 0166 def localcall(self, seq, request): 0167 self.debug("localcall:", request) 0168 try: 0169 how, (oid, methodname, args, kwargs) = request 0170 except TypeError: 0171 return ("ERROR", "Bad request format") 0172 if not self.objtable.has_key(oid): 0173 return ("ERROR", "Unknown object id: %r" % (oid,)) 0174 obj = self.objtable[oid] 0175 if methodname == "__methods__": 0176 methods = {} 0177 _getmethods(obj, methods) 0178 return ("OK", methods) 0179 if methodname == "__attributes__": 0180 attributes = {} 0181 _getattributes(obj, attributes) 0182 return ("OK", attributes) 0183 if not hasattr(obj, methodname): 0184 return ("ERROR", "Unsupported method name: %r" % (methodname,)) 0185 method = getattr(obj, methodname) 0186 try: 0187 if how == 'CALL': 0188 ret = method(*args, **kwargs) 0189 if isinstance(ret, RemoteObject): 0190 ret = remoteref(ret) 0191 return ("OK", ret) 0192 elif how == 'QUEUE': 0193 request_queue.put((seq, (method, args, kwargs))) 0194 return("QUEUED", None) 0195 else: 0196 return ("ERROR", "Unsupported message type: %s" % how) 0197 except SystemExit: 0198 raise 0199 except socket.error: 0200 raise 0201 except: 0202 self.debug("localcall:EXCEPTION") 0203 traceback.print_exc(file=sys.__stderr__) 0204 return ("EXCEPTION", None) 0205 0206 def remotecall(self, oid, methodname, args, kwargs): 0207 self.debug("remotecall:asynccall: ", oid, methodname) 0208 seq = self.asynccall(oid, methodname, args, kwargs) 0209 return self.asyncreturn(seq) 0210 0211 def remotequeue(self, oid, methodname, args, kwargs): 0212 self.debug("remotequeue:asyncqueue: ", oid, methodname) 0213 seq = self.asyncqueue(oid, methodname, args, kwargs) 0214 return self.asyncreturn(seq) 0215 0216 def asynccall(self, oid, methodname, args, kwargs): 0217 request = ("CALL", (oid, methodname, args, kwargs)) 0218 seq = self.newseq() 0219 if threading.currentThread() != self.sockthread: 0220 cvar = threading.Condition() 0221 self.cvars[seq] = cvar 0222 self.debug(("asynccall:%d:" % seq), oid, methodname, args, kwargs) 0223 self.putmessage((seq, request)) 0224 return seq 0225 0226 def asyncqueue(self, oid, methodname, args, kwargs): 0227 request = ("QUEUE", (oid, methodname, args, kwargs)) 0228 seq = self.newseq() 0229 if threading.currentThread() != self.sockthread: 0230 cvar = threading.Condition() 0231 self.cvars[seq] = cvar 0232 self.debug(("asyncqueue:%d:" % seq), oid, methodname, args, kwargs) 0233 self.putmessage((seq, request)) 0234 return seq 0235 0236 def asyncreturn(self, seq): 0237 self.debug("asyncreturn:%d:call getresponse(): " % seq) 0238 response = self.getresponse(seq, wait=0.05) 0239 self.debug(("asyncreturn:%d:response: " % seq), response) 0240 return self.decoderesponse(response) 0241 0242 def decoderesponse(self, response): 0243 how, what = response 0244 if how == "OK": 0245 return what 0246 if how == "QUEUED": 0247 return None 0248 if how == "EXCEPTION": 0249 self.debug("decoderesponse: EXCEPTION") 0250 return None 0251 if how == "EOF": 0252 self.debug("decoderesponse: EOF") 0253 self.decode_interrupthook() 0254 return None 0255 if how == "ERROR": 0256 self.debug("decoderesponse: Internal ERROR:", what) 0257 raise RuntimeError, what 0258 raise SystemError, (how, what) 0259 0260 def decode_interrupthook(self): 0261 "" 0262 raise EOFError 0263 0264 def mainloop(self): 0265 """Listen on socket until I/O not ready or EOF 0266 0267 pollresponse() will loop looking for seq number None, which 0268 never comes, and exit on EOFError. 0269 0270 """ 0271 try: 0272 self.getresponse(myseq=None, wait=0.05) 0273 except EOFError: 0274 self.debug("mainloop:return") 0275 return 0276 0277 def getresponse(self, myseq, wait): 0278 response = self._getresponse(myseq, wait) 0279 if response is not None: 0280 how, what = response 0281 if how == "OK": 0282 response = how, self._proxify(what) 0283 return response 0284 0285 def _proxify(self, obj): 0286 if isinstance(obj, RemoteProxy): 0287 return RPCProxy(self, obj.oid) 0288 if isinstance(obj, types.ListType): 0289 return map(self._proxify, obj) 0290 # XXX Check for other types -- not currently needed 0291 return obj 0292 0293 def _getresponse(self, myseq, wait): 0294 self.debug("_getresponse:myseq:", myseq) 0295 if threading.currentThread() is self.sockthread: 0296 # this thread does all reading of requests or responses 0297 while 1: 0298 response = self.pollresponse(myseq, wait) 0299 if response is not None: 0300 return response 0301 else: 0302 # wait for notification from socket handling thread 0303 cvar = self.cvars[myseq] 0304 cvar.acquire() 0305 while not self.responses.has_key(myseq): 0306 cvar.wait() 0307 response = self.responses[myseq] 0308 self.debug("_getresponse:%s: thread woke up: response: %s" % 0309 (myseq, response)) 0310 del self.responses[myseq] 0311 del self.cvars[myseq] 0312 cvar.release() 0313 return response 0314 0315 def newseq(self): 0316 self.nextseq = seq = self.nextseq + 2 0317 return seq 0318 0319 def putmessage(self, message): 0320 self.debug("putmessage:%d:" % message[0]) 0321 try: 0322 s = pickle.dumps(message) 0323 except pickle.PicklingError: 0324 print >>sys.__stderr__, "Cannot pickle:", repr(message) 0325 raise 0326 s = struct.pack("<i", len(s)) + s 0327 while len(s) > 0: 0328 try: 0329 r, w, x = select.select([], [self.sock], []) 0330 n = self.sock.send(s[:BUFSIZE]) 0331 except (AttributeError, socket.error): 0332 # socket was closed 0333 raise IOError 0334 else: 0335 s = s[n:] 0336 0337 buffer = "" 0338 bufneed = 4 0339 bufstate = 0 # meaning: 0 => reading count; 1 => reading data 0340 0341 def pollpacket(self, wait): 0342 self._stage0() 0343 if len(self.buffer) < self.bufneed: 0344 r, w, x = select.select([self.sock.fileno()], [], [], wait) 0345 if len(r) == 0: 0346 return None 0347 try: 0348 s = self.sock.recv(BUFSIZE) 0349 except socket.error: 0350 raise EOFError 0351 if len(s) == 0: 0352 raise EOFError 0353 self.buffer += s 0354 self._stage0() 0355 return self._stage1() 0356 0357 def _stage0(self): 0358 if self.bufstate == 0 and len(self.buffer) >= 4: 0359 s = self.buffer[:4] 0360 self.buffer = self.buffer[4:] 0361 self.bufneed = struct.unpack("<i", s)[0] 0362 self.bufstate = 1 0363 0364 def _stage1(self): 0365 if self.bufstate == 1 and len(self.buffer) >= self.bufneed: 0366 packet = self.buffer[:self.bufneed] 0367 self.buffer = self.buffer[self.bufneed:] 0368 self.bufneed = 4 0369 self.bufstate = 0 0370 return packet 0371 0372 def pollmessage(self, wait): 0373 packet = self.pollpacket(wait) 0374 if packet is None: 0375 return None 0376 try: 0377 message = pickle.loads(packet) 0378 except pickle.UnpicklingError: 0379 print >>sys.__stderr__, "-----------------------" 0380 print >>sys.__stderr__, "cannot unpickle packet:", repr(packet) 0381 traceback.print_stack(file=sys.__stderr__) 0382 print >>sys.__stderr__, "-----------------------" 0383 raise 0384 return message 0385 0386 def pollresponse(self, myseq, wait): 0387 """Handle messages received on the socket. 0388 0389 Some messages received may be asynchronous 'call' or 'queue' requests, 0390 and some may be responses for other threads. 0391 0392 'call' requests are passed to self.localcall() with the expectation of 0393 immediate execution, during which time the socket is not serviced. 0394 0395 'queue' requests are used for tasks (which may block or hang) to be 0396 processed in a different thread. These requests are fed into 0397 request_queue by self.localcall(). Responses to queued requests are 0398 taken from response_queue and sent across the link with the associated 0399 sequence numbers. Messages in the queues are (sequence_number, 0400 request/response) tuples and code using this module removing messages 0401 from the request_queue is responsible for returning the correct 0402 sequence number in the response_queue. 0403 0404 pollresponse() will loop until a response message with the myseq 0405 sequence number is received, and will save other responses in 0406 self.responses and notify the owning thread. 0407 0408 """ 0409 while 1: 0410 # send queued response if there is one available 0411 try: 0412 qmsg = response_queue.get(0) 0413 except Queue.Empty: 0414 pass 0415 else: 0416 seq, response = qmsg 0417 message = (seq, ('OK', response)) 0418 self.putmessage(message) 0419 # poll for message on link 0420 try: 0421 message = self.pollmessage(wait) 0422 if message is None: # socket not ready 0423 return None 0424 except EOFError: 0425 self.handle_EOF() 0426 return None 0427 except AttributeError: 0428 return None 0429 seq, resq = message 0430 how = resq[0] 0431 self.debug("pollresponse:%d:myseq:%s" % (seq, myseq)) 0432 # process or queue a request 0433 if how in ("CALL", "QUEUE"): 0434 self.debug("pollresponse:%d:localcall:call:" % seq) 0435 response = self.localcall(seq, resq) 0436 self.debug("pollresponse:%d:localcall:response:%s" 0437 % (seq, response)) 0438 if how == "CALL": 0439 self.putmessage((seq, response)) 0440 elif how == "QUEUE": 0441 # don't acknowledge the 'queue' request! 0442 pass 0443 continue 0444 # return if completed message transaction 0445 elif seq == myseq: 0446 return resq 0447 # must be a response for a different thread: 0448 else: 0449 cv = self.cvars.get(seq, None) 0450 # response involving unknown sequence number is discarded, 0451 # probably intended for prior incarnation of server 0452 if cv is not None: 0453 cv.acquire() 0454 self.responses[seq] = resq 0455 cv.notify() 0456 cv.release() 0457 continue 0458 0459 def handle_EOF(self): 0460 "action taken upon link being closed by peer" 0461 self.EOFhook() 0462 self.debug("handle_EOF") 0463 for key in self.cvars: 0464 cv = self.cvars[key] 0465 cv.acquire() 0466 self.responses[key] = ('EOF', None) 0467 cv.notify() 0468 cv.release() 0469 # call our (possibly overridden) exit function 0470 self.exithook() 0471 0472 def EOFhook(self): 0473 "Classes using rpc client/server can override to augment EOF action" 0474 pass 0475 0476 #----------------- end class SocketIO -------------------- 0477 0478 class RemoteObject: 0479 # Token mix-in class 0480 pass 0481 0482 def remoteref(obj): 0483 oid = id(obj) 0484 objecttable[oid] = obj 0485 return RemoteProxy(oid) 0486 0487 class RemoteProxy: 0488 0489 def __init__(self, oid): 0490 self.oid = oid 0491 0492 class RPCHandler(SocketServer.BaseRequestHandler, SocketIO): 0493 0494 debugging = False 0495 location = "#S" # Server 0496 0497 def __init__(self, sock, addr, svr): 0498 svr.current_handler = self ## cgt xxx 0499 SocketIO.__init__(self, sock) 0500 SocketServer.BaseRequestHandler.__init__(self, sock, addr, svr) 0501 0502 def handle(self): 0503 "handle() method required by SocketServer" 0504 self.mainloop() 0505 0506 def get_remote_proxy(self, oid): 0507 return RPCProxy(self, oid) 0508 0509 class RPCClient(SocketIO): 0510 0511 debugging = False 0512 location = "#C" # Client 0513 0514 nextseq = 1 # Requests coming from the client are odd numbered 0515 0516 def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM): 0517 self.listening_sock = socket.socket(family, type) 0518 self.listening_sock.setsockopt(socket.SOL_SOCKET, 0519 socket.SO_REUSEADDR, 1) 0520 self.listening_sock.bind(address) 0521 self.listening_sock.listen(1) 0522 0523 def accept(self): 0524 working_sock, address = self.listening_sock.accept() 0525 if self.debugging: 0526 print>>sys.__stderr__, "****** Connection request from ", address 0527 if address[0] == LOCALHOST: 0528 SocketIO.__init__(self, working_sock) 0529 else: 0530 print>>sys.__stderr__, "** Invalid host: ", address 0531 raise socket.error 0532 0533 def get_remote_proxy(self, oid): 0534 return RPCProxy(self, oid) 0535 0536 class RPCProxy: 0537 0538 __methods = None 0539 __attributes = None 0540 0541 def __init__(self, sockio, oid): 0542 self.sockio = sockio 0543 self.oid = oid 0544 0545 def __getattr__(self, name): 0546 if self.__methods is None: 0547 self.__getmethods() 0548 if self.__methods.get(name): 0549 return MethodProxy(self.sockio, self.oid, name) 0550 if self.__attributes is None: 0551 self.__getattributes() 0552 if not self.__attributes.has_key(name): 0553 raise AttributeError, name 0554 0555 def __getattributes(self): 0556 self.__attributes = self.sockio.remotecall(self.oid, 0557 "__attributes__", (), {}) 0558 0559 def __getmethods(self): 0560 self.__methods = self.sockio.remotecall(self.oid, 0561 "__methods__", (), {}) 0562 0563 def _getmethods(obj, methods): 0564 # Helper to get a list of methods from an object 0565 # Adds names to dictionary argument 'methods' 0566 for name in dir(obj): 0567 attr = getattr(obj, name) 0568 if callable(attr): 0569 methods[name] = 1 0570 if type(obj) == types.InstanceType: 0571 _getmethods(obj.__class__, methods) 0572 if type(obj) == types.ClassType: 0573 for super in obj.__bases__: 0574 _getmethods(super, methods) 0575 0576 def _getattributes(obj, attributes): 0577 for name in dir(obj): 0578 attr = getattr(obj, name) 0579 if not callable(attr): 0580 attributes[name] = 1 0581 0582 class MethodProxy: 0583 0584 def __init__(self, sockio, oid, name): 0585 self.sockio = sockio 0586 self.oid = oid 0587 self.name = name 0588 0589 def __call__(self, *args, **kwargs): 0590 value = self.sockio.remotecall(self.oid, self.name, args, kwargs) 0591 return value 0592 0593 0594 # XXX KBK 09Sep03 We need a proper unit test for this module. Previously 0595 # existing test code was removed at Rev 1.27. 0596
Generated by PyXR 0.9.4