PyXR

c:\python24\lib \ test \ test_thread.py



0001 # Very rudimentary test of thread module
0002 
0003 # Create a bunch of threads, let each do some work, wait until all are done
0004 
0005 from test.test_support import verbose
0006 import random
0007 import thread
0008 import time
0009 
0010 mutex = thread.allocate_lock()
0011 rmutex = thread.allocate_lock() # for calls to random
0012 running = 0
0013 done = thread.allocate_lock()
0014 done.acquire()
0015 
0016 numtasks = 10
0017 
0018 def task(ident):
0019     global running
0020     rmutex.acquire()
0021     delay = random.random() * numtasks
0022     rmutex.release()
0023     if verbose:
0024         print 'task', ident, 'will run for', round(delay, 1), 'sec'
0025     time.sleep(delay)
0026     if verbose:
0027         print 'task', ident, 'done'
0028     mutex.acquire()
0029     running = running - 1
0030     if running == 0:
0031         done.release()
0032     mutex.release()
0033 
0034 next_ident = 0
0035 def newtask():
0036     global next_ident, running
0037     mutex.acquire()
0038     next_ident = next_ident + 1
0039     if verbose:
0040         print 'creating task', next_ident
0041     thread.start_new_thread(task, (next_ident,))
0042     running = running + 1
0043     mutex.release()
0044 
0045 for i in range(numtasks):
0046     newtask()
0047 
0048 print 'waiting for all tasks to complete'
0049 done.acquire()
0050 print 'all tasks done'
0051 
0052 class barrier:
0053     def __init__(self, n):
0054         self.n = n
0055         self.waiting = 0
0056         self.checkin  = thread.allocate_lock()
0057         self.checkout = thread.allocate_lock()
0058         self.checkout.acquire()
0059 
0060     def enter(self):
0061         checkin, checkout = self.checkin, self.checkout
0062 
0063         checkin.acquire()
0064         self.waiting = self.waiting + 1
0065         if self.waiting == self.n:
0066             self.waiting = self.n - 1
0067             checkout.release()
0068             return
0069         checkin.release()
0070 
0071         checkout.acquire()
0072         self.waiting = self.waiting - 1
0073         if self.waiting == 0:
0074             checkin.release()
0075             return
0076         checkout.release()
0077 
0078 numtrips = 3
0079 def task2(ident):
0080     global running
0081     for i in range(numtrips):
0082         if ident == 0:
0083             # give it a good chance to enter the next
0084             # barrier before the others are all out
0085             # of the current one
0086             delay = 0.001
0087         else:
0088             rmutex.acquire()
0089             delay = random.random() * numtasks
0090             rmutex.release()
0091         if verbose:
0092             print 'task', ident, 'will run for', round(delay, 1), 'sec'
0093         time.sleep(delay)
0094         if verbose:
0095             print 'task', ident, 'entering barrier', i
0096         bar.enter()
0097         if verbose:
0098             print 'task', ident, 'leaving barrier', i
0099     mutex.acquire()
0100     running -= 1
0101     # Must release mutex before releasing done, else the main thread can
0102     # exit and set mutex to None as part of global teardown; then
0103     # mutex.release() raises AttributeError.
0104     finished = running == 0
0105     mutex.release()
0106     if finished:
0107         done.release()
0108 
0109 print '\n*** Barrier Test ***'
0110 if done.acquire(0):
0111     raise ValueError, "'done' should have remained acquired"
0112 bar = barrier(numtasks)
0113 running = numtasks
0114 for i in range(numtasks):
0115     thread.start_new_thread(task2, (i,))
0116 done.acquire()
0117 print 'all tasks done'
0118 

Generated by PyXR 0.9.4
SourceForge.net Logo