0001 # Some simple Queue module tests, plus some failure conditions 0002 # to ensure the Queue locks remain stable. 0003 import Queue 0004 import sys 0005 import threading 0006 import time 0007 0008 from test.test_support import verify, TestFailed, verbose 0009 0010 QUEUE_SIZE = 5 0011 0012 # A thread to run a function that unclogs a blocked Queue. 0013 class _TriggerThread(threading.Thread): 0014 def __init__(self, fn, args): 0015 self.fn = fn 0016 self.args = args 0017 self.startedEvent = threading.Event() 0018 threading.Thread.__init__(self) 0019 0020 def run(self): 0021 # The sleep isn't necessary, but is intended to give the blocking 0022 # function in the main thread a chance at actually blocking before 0023 # we unclog it. But if the sleep is longer than the timeout-based 0024 # tests wait in their blocking functions, those tests will fail. 0025 # So we give them much longer timeout values compared to the 0026 # sleep here (I aimed at 10 seconds for blocking functions -- 0027 # they should never actually wait that long - they should make 0028 # progress as soon as we call self.fn()). 0029 time.sleep(0.1) 0030 self.startedEvent.set() 0031 self.fn(*self.args) 0032 0033 # Execute a function that blocks, and in a separate thread, a function that 0034 # triggers the release. Returns the result of the blocking function. 0035 # Caution: block_func must guarantee to block until trigger_func is 0036 # called, and trigger_func must guarantee to change queue state so that 0037 # block_func can make enough progress to return. In particular, a 0038 # block_func that just raises an exception regardless of whether trigger_func 0039 # is called will lead to timing-dependent sporadic failures, and one of 0040 # those went rarely seen but undiagnosed for years. Now block_func 0041 # must be unexceptional. If block_func is supposed to raise an exception, 0042 # call _doExceptionalBlockingTest() instead. 0043 def _doBlockingTest(block_func, block_args, trigger_func, trigger_args): 0044 t = _TriggerThread(trigger_func, trigger_args) 0045 t.start() 0046 result = block_func(*block_args) 0047 # If block_func returned before our thread made the call, we failed! 0048 if not t.startedEvent.isSet(): 0049 raise TestFailed("blocking function '%r' appeared not to block" % 0050 block_func) 0051 t.join(10) # make sure the thread terminates 0052 if t.isAlive(): 0053 raise TestFailed("trigger function '%r' appeared to not return" % 0054 trigger_func) 0055 return result 0056 0057 # Call this instead if block_func is supposed to raise an exception. 0058 def _doExceptionalBlockingTest(block_func, block_args, trigger_func, 0059 trigger_args, expected_exception_class): 0060 t = _TriggerThread(trigger_func, trigger_args) 0061 t.start() 0062 try: 0063 try: 0064 block_func(*block_args) 0065 except expected_exception_class: 0066 raise 0067 else: 0068 raise TestFailed("expected exception of kind %r" % 0069 expected_exception_class) 0070 finally: 0071 t.join(10) # make sure the thread terminates 0072 if t.isAlive(): 0073 raise TestFailed("trigger function '%r' appeared to not return" % 0074 trigger_func) 0075 if not t.startedEvent.isSet(): 0076 raise TestFailed("trigger thread ended but event never set") 0077 0078 # A Queue subclass that can provoke failure at a moment's notice :) 0079 class FailingQueueException(Exception): 0080 pass 0081 0082 class FailingQueue(Queue.Queue): 0083 def __init__(self, *args): 0084 self.fail_next_put = False 0085 self.fail_next_get = False 0086 Queue.Queue.__init__(self, *args) 0087 def _put(self, item): 0088 if self.fail_next_put: 0089 self.fail_next_put = False 0090 raise FailingQueueException, "You Lose" 0091 return Queue.Queue._put(self, item) 0092 def _get(self): 0093 if self.fail_next_get: 0094 self.fail_next_get = False 0095 raise FailingQueueException, "You Lose" 0096 return Queue.Queue._get(self) 0097 0098 def FailingQueueTest(q): 0099 if not q.empty(): 0100 raise RuntimeError, "Call this function with an empty queue" 0101 for i in range(QUEUE_SIZE-1): 0102 q.put(i) 0103 # Test a failing non-blocking put. 0104 q.fail_next_put = True 0105 try: 0106 q.put("oops", block=0) 0107 raise TestFailed("The queue didn't fail when it should have") 0108 except FailingQueueException: 0109 pass 0110 q.fail_next_put = True 0111 try: 0112 q.put("oops", timeout=0.1) 0113 raise TestFailed("The queue didn't fail when it should have") 0114 except FailingQueueException: 0115 pass 0116 q.put("last") 0117 verify(q.full(), "Queue should be full") 0118 # Test a failing blocking put 0119 q.fail_next_put = True 0120 try: 0121 _doBlockingTest(q.put, ("full",), q.get, ()) 0122 raise TestFailed("The queue didn't fail when it should have") 0123 except FailingQueueException: 0124 pass 0125 # Check the Queue isn't damaged. 0126 # put failed, but get succeeded - re-add 0127 q.put("last") 0128 # Test a failing timeout put 0129 q.fail_next_put = True 0130 try: 0131 _doExceptionalBlockingTest(q.put, ("full", True, 10), q.get, (), 0132 FailingQueueException) 0133 raise TestFailed("The queue didn't fail when it should have") 0134 except FailingQueueException: 0135 pass 0136 # Check the Queue isn't damaged. 0137 # put failed, but get succeeded - re-add 0138 q.put("last") 0139 verify(q.full(), "Queue should be full") 0140 q.get() 0141 verify(not q.full(), "Queue should not be full") 0142 q.put("last") 0143 verify(q.full(), "Queue should be full") 0144 # Test a blocking put 0145 _doBlockingTest( q.put, ("full",), q.get, ()) 0146 # Empty it 0147 for i in range(QUEUE_SIZE): 0148 q.get() 0149 verify(q.empty(), "Queue should be empty") 0150 q.put("first") 0151 q.fail_next_get = True 0152 try: 0153 q.get() 0154 raise TestFailed("The queue didn't fail when it should have") 0155 except FailingQueueException: 0156 pass 0157 verify(not q.empty(), "Queue should not be empty") 0158 q.fail_next_get = True 0159 try: 0160 q.get(timeout=0.1) 0161 raise TestFailed("The queue didn't fail when it should have") 0162 except FailingQueueException: 0163 pass 0164 verify(not q.empty(), "Queue should not be empty") 0165 q.get() 0166 verify(q.empty(), "Queue should be empty") 0167 q.fail_next_get = True 0168 try: 0169 _doExceptionalBlockingTest(q.get, (), q.put, ('empty',), 0170 FailingQueueException) 0171 raise TestFailed("The queue didn't fail when it should have") 0172 except FailingQueueException: 0173 pass 0174 # put succeeded, but get failed. 0175 verify(not q.empty(), "Queue should not be empty") 0176 q.get() 0177 verify(q.empty(), "Queue should be empty") 0178 0179 def SimpleQueueTest(q): 0180 if not q.empty(): 0181 raise RuntimeError, "Call this function with an empty queue" 0182 # I guess we better check things actually queue correctly a little :) 0183 q.put(111) 0184 q.put(222) 0185 verify(q.get() == 111 and q.get() == 222, 0186 "Didn't seem to queue the correct data!") 0187 for i in range(QUEUE_SIZE-1): 0188 q.put(i) 0189 verify(not q.empty(), "Queue should not be empty") 0190 verify(not q.full(), "Queue should not be full") 0191 q.put("last") 0192 verify(q.full(), "Queue should be full") 0193 try: 0194 q.put("full", block=0) 0195 raise TestFailed("Didn't appear to block with a full queue") 0196 except Queue.Full: 0197 pass 0198 try: 0199 q.put("full", timeout=0.01) 0200 raise TestFailed("Didn't appear to time-out with a full queue") 0201 except Queue.Full: 0202 pass 0203 # Test a blocking put 0204 _doBlockingTest(q.put, ("full",), q.get, ()) 0205 _doBlockingTest(q.put, ("full", True, 10), q.get, ()) 0206 # Empty it 0207 for i in range(QUEUE_SIZE): 0208 q.get() 0209 verify(q.empty(), "Queue should be empty") 0210 try: 0211 q.get(block=0) 0212 raise TestFailed("Didn't appear to block with an empty queue") 0213 except Queue.Empty: 0214 pass 0215 try: 0216 q.get(timeout=0.01) 0217 raise TestFailed("Didn't appear to time-out with an empty queue") 0218 except Queue.Empty: 0219 pass 0220 # Test a blocking get 0221 _doBlockingTest(q.get, (), q.put, ('empty',)) 0222 _doBlockingTest(q.get, (True, 10), q.put, ('empty',)) 0223 0224 def test(): 0225 q = Queue.Queue(QUEUE_SIZE) 0226 # Do it a couple of times on the same queue 0227 SimpleQueueTest(q) 0228 SimpleQueueTest(q) 0229 if verbose: 0230 print "Simple Queue tests seemed to work" 0231 q = FailingQueue(QUEUE_SIZE) 0232 FailingQueueTest(q) 0233 FailingQueueTest(q) 0234 if verbose: 0235 print "Failing Queue tests seemed to work" 0236 0237 test() 0238
Generated by PyXR 0.9.4