PyXR

c:\python24\lib \ test \ test_poll.py



0001 # Test case for the os.poll() function
0002 
0003 import sys, os, select, random
0004 from test.test_support import verify, verbose, TestSkipped, TESTFN
0005 
0006 try:
0007     select.poll
0008 except AttributeError:
0009     raise TestSkipped, "select.poll not defined -- skipping test_poll"
0010 
0011 
0012 def find_ready_matching(ready, flag):
0013     match = []
0014     for fd, mode in ready:
0015         if mode & flag:
0016             match.append(fd)
0017     return match
0018 
0019 def test_poll1():
0020     """Basic functional test of poll object
0021 
0022     Create a bunch of pipe and test that poll works with them.
0023     """
0024     print 'Running poll test 1'
0025     p = select.poll()
0026 
0027     NUM_PIPES = 12
0028     MSG = " This is a test."
0029     MSG_LEN = len(MSG)
0030     readers = []
0031     writers = []
0032     r2w = {}
0033     w2r = {}
0034 
0035     for i in range(NUM_PIPES):
0036         rd, wr = os.pipe()
0037         p.register(rd, select.POLLIN)
0038         p.register(wr, select.POLLOUT)
0039         readers.append(rd)
0040         writers.append(wr)
0041         r2w[rd] = wr
0042         w2r[wr] = rd
0043 
0044     while writers:
0045         ready = p.poll()
0046         ready_writers = find_ready_matching(ready, select.POLLOUT)
0047         if not ready_writers:
0048             raise RuntimeError, "no pipes ready for writing"
0049         wr = random.choice(ready_writers)
0050         os.write(wr, MSG)
0051 
0052         ready = p.poll()
0053         ready_readers = find_ready_matching(ready, select.POLLIN)
0054         if not ready_readers:
0055             raise RuntimeError, "no pipes ready for reading"
0056         rd = random.choice(ready_readers)
0057         buf = os.read(rd, MSG_LEN)
0058         verify(len(buf) == MSG_LEN)
0059         print buf
0060         os.close(r2w[rd]) ; os.close( rd )
0061         p.unregister( r2w[rd] )
0062         p.unregister( rd )
0063         writers.remove(r2w[rd])
0064 
0065     poll_unit_tests()
0066     print 'Poll test 1 complete'
0067 
0068 def poll_unit_tests():
0069     # returns NVAL for invalid file descriptor
0070     FD = 42
0071     try:
0072         os.close(FD)
0073     except OSError:
0074         pass
0075     p = select.poll()
0076     p.register(FD)
0077     r = p.poll()
0078     verify(r[0] == (FD, select.POLLNVAL))
0079 
0080     f = open(TESTFN, 'w')
0081     fd = f.fileno()
0082     p = select.poll()
0083     p.register(f)
0084     r = p.poll()
0085     verify(r[0][0] == fd)
0086     f.close()
0087     r = p.poll()
0088     verify(r[0] == (fd, select.POLLNVAL))
0089     os.unlink(TESTFN)
0090 
0091     # type error for invalid arguments
0092     p = select.poll()
0093     try:
0094         p.register(p)
0095     except TypeError:
0096         pass
0097     else:
0098         print "Bogus register call did not raise TypeError"
0099     try:
0100         p.unregister(p)
0101     except TypeError:
0102         pass
0103     else:
0104         print "Bogus unregister call did not raise TypeError"
0105 
0106     # can't unregister non-existent object
0107     p = select.poll()
0108     try:
0109         p.unregister(3)
0110     except KeyError:
0111         pass
0112     else:
0113         print "Bogus unregister call did not raise KeyError"
0114 
0115     # Test error cases
0116     pollster = select.poll()
0117     class Nope:
0118         pass
0119 
0120     class Almost:
0121         def fileno(self):
0122             return 'fileno'
0123 
0124     try:
0125         pollster.register( Nope(), 0 )
0126     except TypeError: pass
0127     else: print 'expected TypeError exception, not raised'
0128 
0129     try:
0130         pollster.register( Almost(), 0 )
0131     except TypeError: pass
0132     else: print 'expected TypeError exception, not raised'
0133 
0134 
0135 # Another test case for poll().  This is copied from the test case for
0136 # select(), modified to use poll() instead.
0137 
0138 def test_poll2():
0139     print 'Running poll test 2'
0140     cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
0141     p = os.popen(cmd, 'r')
0142     pollster = select.poll()
0143     pollster.register( p, select.POLLIN )
0144     for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
0145         if verbose:
0146             print 'timeout =', tout
0147         fdlist = pollster.poll(tout)
0148         if (fdlist == []):
0149             continue
0150         fd, flags = fdlist[0]
0151         if flags & select.POLLHUP:
0152             line = p.readline()
0153             if line != "":
0154                 print 'error: pipe seems to be closed, but still returns data'
0155             continue
0156 
0157         elif flags & select.POLLIN:
0158             line = p.readline()
0159             if verbose:
0160                 print repr(line)
0161             if not line:
0162                 if verbose:
0163                     print 'EOF'
0164                 break
0165             continue
0166         else:
0167             print 'Unexpected return value from select.poll:', fdlist
0168     p.close()
0169     print 'Poll test 2 complete'
0170 
0171 test_poll1()
0172 test_poll2()
0173 

Generated by PyXR 0.9.4
SourceForge.net Logo