0001 # Test case for the os.poll() function 0002 0003 import sys, os, select, random 0004 from test.test_support import verify, verbose, TestSkipped, TESTFN 0005 0006 try: 0007 select.poll 0008 except AttributeError: 0009 raise TestSkipped, "select.poll not defined -- skipping test_poll" 0010 0011 0012 def find_ready_matching(ready, flag): 0013 match = [] 0014 for fd, mode in ready: 0015 if mode & flag: 0016 match.append(fd) 0017 return match 0018 0019 def test_poll1(): 0020 """Basic functional test of poll object 0021 0022 Create a bunch of pipe and test that poll works with them. 0023 """ 0024 print 'Running poll test 1' 0025 p = select.poll() 0026 0027 NUM_PIPES = 12 0028 MSG = " This is a test." 0029 MSG_LEN = len(MSG) 0030 readers = [] 0031 writers = [] 0032 r2w = {} 0033 w2r = {} 0034 0035 for i in range(NUM_PIPES): 0036 rd, wr = os.pipe() 0037 p.register(rd, select.POLLIN) 0038 p.register(wr, select.POLLOUT) 0039 readers.append(rd) 0040 writers.append(wr) 0041 r2w[rd] = wr 0042 w2r[wr] = rd 0043 0044 while writers: 0045 ready = p.poll() 0046 ready_writers = find_ready_matching(ready, select.POLLOUT) 0047 if not ready_writers: 0048 raise RuntimeError, "no pipes ready for writing" 0049 wr = random.choice(ready_writers) 0050 os.write(wr, MSG) 0051 0052 ready = p.poll() 0053 ready_readers = find_ready_matching(ready, select.POLLIN) 0054 if not ready_readers: 0055 raise RuntimeError, "no pipes ready for reading" 0056 rd = random.choice(ready_readers) 0057 buf = os.read(rd, MSG_LEN) 0058 verify(len(buf) == MSG_LEN) 0059 print buf 0060 os.close(r2w[rd]) ; os.close( rd ) 0061 p.unregister( r2w[rd] ) 0062 p.unregister( rd ) 0063 writers.remove(r2w[rd]) 0064 0065 poll_unit_tests() 0066 print 'Poll test 1 complete' 0067 0068 def poll_unit_tests(): 0069 # returns NVAL for invalid file descriptor 0070 FD = 42 0071 try: 0072 os.close(FD) 0073 except OSError: 0074 pass 0075 p = select.poll() 0076 p.register(FD) 0077 r = p.poll() 0078 verify(r[0] == (FD, select.POLLNVAL)) 0079 0080 f = open(TESTFN, 'w') 0081 fd = f.fileno() 0082 p = select.poll() 0083 p.register(f) 0084 r = p.poll() 0085 verify(r[0][0] == fd) 0086 f.close() 0087 r = p.poll() 0088 verify(r[0] == (fd, select.POLLNVAL)) 0089 os.unlink(TESTFN) 0090 0091 # type error for invalid arguments 0092 p = select.poll() 0093 try: 0094 p.register(p) 0095 except TypeError: 0096 pass 0097 else: 0098 print "Bogus register call did not raise TypeError" 0099 try: 0100 p.unregister(p) 0101 except TypeError: 0102 pass 0103 else: 0104 print "Bogus unregister call did not raise TypeError" 0105 0106 # can't unregister non-existent object 0107 p = select.poll() 0108 try: 0109 p.unregister(3) 0110 except KeyError: 0111 pass 0112 else: 0113 print "Bogus unregister call did not raise KeyError" 0114 0115 # Test error cases 0116 pollster = select.poll() 0117 class Nope: 0118 pass 0119 0120 class Almost: 0121 def fileno(self): 0122 return 'fileno' 0123 0124 try: 0125 pollster.register( Nope(), 0 ) 0126 except TypeError: pass 0127 else: print 'expected TypeError exception, not raised' 0128 0129 try: 0130 pollster.register( Almost(), 0 ) 0131 except TypeError: pass 0132 else: print 'expected TypeError exception, not raised' 0133 0134 0135 # Another test case for poll(). This is copied from the test case for 0136 # select(), modified to use poll() instead. 0137 0138 def test_poll2(): 0139 print 'Running poll test 2' 0140 cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done' 0141 p = os.popen(cmd, 'r') 0142 pollster = select.poll() 0143 pollster.register( p, select.POLLIN ) 0144 for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10: 0145 if verbose: 0146 print 'timeout =', tout 0147 fdlist = pollster.poll(tout) 0148 if (fdlist == []): 0149 continue 0150 fd, flags = fdlist[0] 0151 if flags & select.POLLHUP: 0152 line = p.readline() 0153 if line != "": 0154 print 'error: pipe seems to be closed, but still returns data' 0155 continue 0156 0157 elif flags & select.POLLIN: 0158 line = p.readline() 0159 if verbose: 0160 print repr(line) 0161 if not line: 0162 if verbose: 0163 print 'EOF' 0164 break 0165 continue 0166 else: 0167 print 'Unexpected return value from select.poll:', fdlist 0168 p.close() 0169 print 'Poll test 2 complete' 0170 0171 test_poll1() 0172 test_poll2() 0173
Generated by PyXR 0.9.4