PyXR

c:\python24\lib \ test \ test_asynchat.py



0001 # test asynchat -- requires threading
0002 
0003 import thread # If this fails, we can't test this module
0004 import asyncore, asynchat, socket, threading, time
0005 
0006 HOST = "127.0.0.1"
0007 PORT = 54321
0008 
0009 class echo_server(threading.Thread):
0010 
0011     def run(self):
0012         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
0013         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
0014         sock.bind((HOST, PORT))
0015         sock.listen(1)
0016         conn, client = sock.accept()
0017         buffer = ""
0018         while "\n" not in buffer:
0019             data = conn.recv(10)
0020             if not data:
0021                 break
0022             buffer = buffer + data
0023         while buffer:
0024             n = conn.send(buffer)
0025             buffer = buffer[n:]
0026         conn.close()
0027         sock.close()
0028 
0029 class echo_client(asynchat.async_chat):
0030 
0031     def __init__(self):
0032         asynchat.async_chat.__init__(self)
0033         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
0034         self.connect((HOST, PORT))
0035         self.set_terminator("\n")
0036         self.buffer = ""
0037 
0038     def handle_connect(self):
0039         print "Connected"
0040 
0041     def collect_incoming_data(self, data):
0042         self.buffer = self.buffer + data
0043 
0044     def found_terminator(self):
0045         print "Received:", repr(self.buffer)
0046         self.buffer = ""
0047         self.close()
0048 
0049 def main():
0050     s = echo_server()
0051     s.start()
0052     time.sleep(1) # Give server time to initialize
0053     c = echo_client()
0054     c.push("hello ")
0055     c.push("world\n")
0056     asyncore.loop()
0057 
0058 main()
0059 

Generated by PyXR 0.9.4
SourceForge.net Logo