PyXR

c:\python24\lib \ test \ test_fork1.py



0001 """This test checks for correct fork() behavior.
0002 
0003 We want fork1() semantics -- only the forking thread survives in the
0004 child after a fork().
0005 
0006 On some systems (e.g. Solaris without posix threads) we find that all
0007 active threads survive in the child after a fork(); this is an error.
0008 
0009 While BeOS doesn't officially support fork and native threading in
0010 the same application, the present example should work just fine.  DC
0011 """
0012 
0013 import os, sys, time, thread
0014 from test.test_support import verify, verbose, TestSkipped
0015 
0016 try:
0017     os.fork
0018 except AttributeError:
0019     raise TestSkipped, "os.fork not defined -- skipping test_fork1"
0020 
0021 LONGSLEEP = 2
0022 
0023 SHORTSLEEP = 0.5
0024 
0025 NUM_THREADS = 4
0026 
0027 alive = {}
0028 
0029 stop = 0
0030 
0031 def f(id):
0032     while not stop:
0033         alive[id] = os.getpid()
0034         try:
0035             time.sleep(SHORTSLEEP)
0036         except IOError:
0037             pass
0038 
0039 def main():
0040     for i in range(NUM_THREADS):
0041         thread.start_new(f, (i,))
0042 
0043     time.sleep(LONGSLEEP)
0044 
0045     a = alive.keys()
0046     a.sort()
0047     verify(a == range(NUM_THREADS))
0048 
0049     prefork_lives = alive.copy()
0050 
0051     if sys.platform in ['unixware7']:
0052         cpid = os.fork1()
0053     else:
0054         cpid = os.fork()
0055 
0056     if cpid == 0:
0057         # Child
0058         time.sleep(LONGSLEEP)
0059         n = 0
0060         for key in alive.keys():
0061             if alive[key] != prefork_lives[key]:
0062                 n = n+1
0063         os._exit(n)
0064     else:
0065         # Parent
0066         spid, status = os.waitpid(cpid, 0)
0067         verify(spid == cpid)
0068         verify(status == 0,
0069                 "cause = %d, exit = %d" % (status&0xff, status>>8) )
0070         global stop
0071         # Tell threads to die
0072         stop = 1
0073         time.sleep(2*SHORTSLEEP) # Wait for threads to die
0074 
0075 main()
0076 

Generated by PyXR 0.9.4
SourceForge.net Logo