0001 # Very rudimentary test of threading module 0002 0003 # Create a bunch of threads, let each do some work, wait until all are done 0004 0005 from test.test_support import verbose 0006 import random 0007 import threading 0008 import time 0009 0010 # This takes about n/3 seconds to run (about n/3 clumps of tasks, times 0011 # about 1 second per clump). 0012 numtasks = 10 0013 0014 # no more than 3 of the 10 can run at once 0015 sema = threading.BoundedSemaphore(value=3) 0016 mutex = threading.RLock() 0017 running = 0 0018 0019 class TestThread(threading.Thread): 0020 def run(self): 0021 global running 0022 delay = random.random() * 2 0023 if verbose: 0024 print 'task', self.getName(), 'will run for', delay, 'sec' 0025 sema.acquire() 0026 mutex.acquire() 0027 running = running + 1 0028 if verbose: 0029 print running, 'tasks are running' 0030 mutex.release() 0031 time.sleep(delay) 0032 if verbose: 0033 print 'task', self.getName(), 'done' 0034 mutex.acquire() 0035 running = running - 1 0036 if verbose: 0037 print self.getName(), 'is finished.', running, 'tasks are running' 0038 mutex.release() 0039 sema.release() 0040 0041 threads = [] 0042 def starttasks(): 0043 for i in range(numtasks): 0044 t = TestThread(name="<thread %d>"%i) 0045 threads.append(t) 0046 t.start() 0047 0048 starttasks() 0049 0050 if verbose: 0051 print 'waiting for all tasks to complete' 0052 for t in threads: 0053 t.join() 0054 if verbose: 0055 print 'all tasks done' 0056
Generated by PyXR 0.9.4