0001 # Very rudimentary test of thread module 0002 0003 # Create a bunch of threads, let each do some work, wait until all are done 0004 0005 from test.test_support import verbose 0006 import random 0007 import thread 0008 import time 0009 0010 mutex = thread.allocate_lock() 0011 rmutex = thread.allocate_lock() # for calls to random 0012 running = 0 0013 done = thread.allocate_lock() 0014 done.acquire() 0015 0016 numtasks = 10 0017 0018 def task(ident): 0019 global running 0020 rmutex.acquire() 0021 delay = random.random() * numtasks 0022 rmutex.release() 0023 if verbose: 0024 print 'task', ident, 'will run for', round(delay, 1), 'sec' 0025 time.sleep(delay) 0026 if verbose: 0027 print 'task', ident, 'done' 0028 mutex.acquire() 0029 running = running - 1 0030 if running == 0: 0031 done.release() 0032 mutex.release() 0033 0034 next_ident = 0 0035 def newtask(): 0036 global next_ident, running 0037 mutex.acquire() 0038 next_ident = next_ident + 1 0039 if verbose: 0040 print 'creating task', next_ident 0041 thread.start_new_thread(task, (next_ident,)) 0042 running = running + 1 0043 mutex.release() 0044 0045 for i in range(numtasks): 0046 newtask() 0047 0048 print 'waiting for all tasks to complete' 0049 done.acquire() 0050 print 'all tasks done' 0051 0052 class barrier: 0053 def __init__(self, n): 0054 self.n = n 0055 self.waiting = 0 0056 self.checkin = thread.allocate_lock() 0057 self.checkout = thread.allocate_lock() 0058 self.checkout.acquire() 0059 0060 def enter(self): 0061 checkin, checkout = self.checkin, self.checkout 0062 0063 checkin.acquire() 0064 self.waiting = self.waiting + 1 0065 if self.waiting == self.n: 0066 self.waiting = self.n - 1 0067 checkout.release() 0068 return 0069 checkin.release() 0070 0071 checkout.acquire() 0072 self.waiting = self.waiting - 1 0073 if self.waiting == 0: 0074 checkin.release() 0075 return 0076 checkout.release() 0077 0078 numtrips = 3 0079 def task2(ident): 0080 global running 0081 for i in range(numtrips): 0082 if ident == 0: 0083 # give it a good chance to enter the next 0084 # barrier before the others are all out 0085 # of the current one 0086 delay = 0.001 0087 else: 0088 rmutex.acquire() 0089 delay = random.random() * numtasks 0090 rmutex.release() 0091 if verbose: 0092 print 'task', ident, 'will run for', round(delay, 1), 'sec' 0093 time.sleep(delay) 0094 if verbose: 0095 print 'task', ident, 'entering barrier', i 0096 bar.enter() 0097 if verbose: 0098 print 'task', ident, 'leaving barrier', i 0099 mutex.acquire() 0100 running -= 1 0101 # Must release mutex before releasing done, else the main thread can 0102 # exit and set mutex to None as part of global teardown; then 0103 # mutex.release() raises AttributeError. 0104 finished = running == 0 0105 mutex.release() 0106 if finished: 0107 done.release() 0108 0109 print '\n*** Barrier Test ***' 0110 if done.acquire(0): 0111 raise ValueError, "'done' should have remained acquired" 0112 bar = barrier(numtasks) 0113 running = numtasks 0114 for i in range(numtasks): 0115 thread.start_new_thread(task2, (i,)) 0116 done.acquire() 0117 print 'all tasks done' 0118
Generated by PyXR 0.9.4