0001 import sys, os 0002 import win32api 0003 import tempfile 0004 import unittest 0005 import gc 0006 import pythoncom 0007 import winerror 0008 import cStringIO as StringIO 0009 from pythoncom import _GetInterfaceCount, _GetGatewayCount 0010 0011 def CheckClean(): 0012 # Ensure no lingering exceptions - Python should have zero outstanding 0013 # COM objects 0014 sys.exc_traceback = sys.exc_value = sys.exc_type = None 0015 c = _GetInterfaceCount() 0016 if c: 0017 print "Warning - %d com interface objects still alive" % c 0018 c = _GetGatewayCount() 0019 if c: 0020 print "Warning - %d com gateway objects still alive" % c 0021 0022 def RegisterPythonServer(filename, verbose=0): 0023 cmd = '%s "%s" > nul 2>&1' % (win32api.GetModuleFileName(0), filename) 0024 if verbose: 0025 print "Registering engine", filename 0026 # print cmd 0027 rc = os.system(cmd) 0028 if rc: 0029 raise RuntimeError, "Registration of engine '%s' failed" % filename 0030 0031 def ExecuteShellCommand(cmd, testcase, 0032 expected_output = None, # Set to '' to check for nothing 0033 tracebacks_ok = 0, # OK if the output contains a t/b? 0034 ): 0035 output_name = tempfile.mktemp('win32com_test') 0036 cmd = cmd + ' > "%s" 2>&1' % output_name 0037 rc = os.system(cmd) 0038 output = open(output_name, "r").read().strip() 0039 class Failed(Exception): pass 0040 try: 0041 if rc: 0042 raise Failed, "exit code was " + str(rc) 0043 if expected_output is not None and output != expected_output: 0044 raise Failed, \ 0045 "Expected output %r (got %r)" % (expected_output, output) 0046 if not tracebacks_ok and \ 0047 output.find("Traceback (most recent call last)")>=0: 0048 raise Failed, "traceback in program output" 0049 return output 0050 except Failed, why: 0051 print "Failed to exec command '%r'" % cmd 0052 print "Failed as", why 0053 print "** start of program output **" 0054 print output 0055 print "** end of program output **" 0056 testcase.fail("Executing '%s' failed as %s" % (cmd, why)) 0057 0058 def assertRaisesCOM_HRESULT(testcase, hresult, func, *args, **kw): 0059 try: 0060 func(*args, **kw) 0061 except pythoncom.com_error, details: 0062 if details[0]==hresult: 0063 return 0064 testcase.fail("Excepected COM exception with HRESULT 0x%x" % hresult) 0065 0066 class CaptureWriter: 0067 def __init__(self): 0068 self.old_err = self.old_out = None 0069 self.clear() 0070 def capture(self): 0071 self.clear() 0072 self.old_out = sys.stdout 0073 self.old_err = sys.stderr 0074 sys.stdout = sys.stderr = self 0075 def release(self): 0076 if self.old_out: 0077 sys.stdout = self.old_out 0078 self.old_out = None 0079 if self.old_err: 0080 sys.stderr = self.old_err 0081 self.old_err = None 0082 def clear(self): 0083 self.captured = [] 0084 def write(self, msg): 0085 self.captured.append(msg) 0086 def get_captured(self): 0087 return "".join(self.captured) 0088 def get_num_lines_captured(self): 0089 return len("".join(self.captured).split("\n")) 0090 0091 class LeakTestCase(unittest.TestCase): 0092 def __init__(self, real_test): 0093 unittest.TestCase.__init__(self) 0094 self.real_test = real_test 0095 self.num_test_cases = 1 0096 self.num_leak_iters = 2 # seems to be enough! 0097 if hasattr(sys, "gettotalrefcount"): 0098 self.num_test_cases = self.num_test_cases + self.num_leak_iters 0099 def countTestCases(self): 0100 return self.num_test_cases 0101 def runTest(self): 0102 assert 0, "not used" 0103 def __call__(self, result = None): 0104 # Always ensure we don't leak gateways/interfaces 0105 gc.collect() 0106 ni = _GetInterfaceCount() 0107 ng = _GetGatewayCount() 0108 self.real_test(result) 0109 # Failed - no point checking anything else 0110 if result.shouldStop or not result.wasSuccessful(): 0111 return 0112 self._do_leak_tests(result) 0113 gc.collect() 0114 lost_i = _GetInterfaceCount() - ni 0115 lost_g = _GetGatewayCount() - ng 0116 if lost_i or lost_g: 0117 msg = "%d interface objects and %d gateway objects leaked" \ 0118 % (lost_i, lost_g) 0119 result.addFailure(self.real_test, (AssertionError, msg, None)) 0120 def _do_leak_tests(self, result = None): 0121 try: 0122 gtrc = sys.gettotalrefcount 0123 except AttributeError: 0124 return # can't do leak tests in this build 0125 def gtrc(): 0126 return 0 0127 # Assume already called once, to prime any caches etc 0128 trc = gtrc() 0129 for i in range(self.num_leak_iters): 0130 self.real_test(result) 0131 if result.shouldStop: 0132 break 0133 del i # created after we remembered the refcount! 0134 # int division here means one or 2 stray references won't force 0135 # failure, but one per loop 0136 lost = (gtrc() - trc) // self.num_leak_iters 0137 if lost < 0: 0138 msg = "LeakTest: %s appeared to gain %d references!!" % (self.real_test, -lost) 0139 result.addFailure(self.real_test, (AssertionError, msg, None)) 0140 if lost > 0: 0141 msg = "LeakTest: %s lost %d references" % (self.real_test, lost) 0142 result.addFailure(self.real_test, (AssertionError, msg, None)) 0143 0144 class TestLoader(unittest.TestLoader): 0145 def loadTestsFromTestCase(self, testCaseClass): 0146 """Return a suite of all tests cases contained in testCaseClass""" 0147 leak_tests = [] 0148 for name in self.getTestCaseNames(testCaseClass): 0149 real_test = testCaseClass(name) 0150 leak_test = self._getTestWrapper(real_test) 0151 leak_tests.append(leak_test) 0152 return self.suiteClass(leak_tests) 0153 def _getTestWrapper(self, test): 0154 no_leak_tests = getattr(test, "no_leak_tests", False) 0155 if no_leak_tests: 0156 print "Test says it doesn't want leak tests!" 0157 return test 0158 return LeakTestCase(test) 0159 def loadTestsFromModule(self, mod): 0160 if hasattr(mod, "suite"): 0161 return mod.suite() 0162 else: 0163 return unittest.TestLoader.loadTestsFromModule(self, mod) 0164 def loadTestsFromName(self, name, module=None): 0165 test = unittest.TestLoader.loadTestsFromName(self, name, module) 0166 if isinstance(test, unittest.TestSuite): 0167 pass # hmmm? print "Don't wrap suites yet!", test._tests 0168 elif isinstance(test, unittest.TestCase): 0169 test = self._getTestWrapper(test) 0170 else: 0171 print "XXX - what is", test 0172 return test 0173 0174 # We used to override some of this (and may later!) 0175 TestCase = unittest.TestCase 0176 0177 def CapturingFunctionTestCase(*args, **kw): 0178 real_test = _CapturingFunctionTestCase(*args, **kw) 0179 return LeakTestCase(real_test) 0180 0181 class _CapturingFunctionTestCase(unittest.FunctionTestCase):#, TestCaseMixin): 0182 def __call__(self, result=None): 0183 if result is None: result = self.defaultTestResult() 0184 writer = CaptureWriter() 0185 #self._preTest() 0186 writer.capture() 0187 try: 0188 unittest.FunctionTestCase.__call__(self, result) 0189 if getattr(self, "do_leak_tests", 0) and hasattr(sys, "gettotalrefcount"): 0190 self.run_leak_tests(result) 0191 finally: 0192 writer.release() 0193 #self._postTest(result) 0194 output = writer.get_captured() 0195 self.checkOutput(output, result) 0196 if result.showAll: 0197 print output 0198 def checkOutput(self, output, result): 0199 if output.find("Traceback")>=0: 0200 msg = "Test output contained a traceback\n---\n%s\n---" % output 0201 result.errors.append((self, msg)) 0202 0203 class ShellTestCase(unittest.TestCase): 0204 def __init__(self, cmd, expected_output): 0205 self.__cmd = cmd 0206 self.__eo = expected_output 0207 unittest.TestCase.__init__(self) 0208 def runTest(self): 0209 ExecuteShellCommand(self.__cmd, self, self.__eo) 0210 def __str__(self): 0211 max = 30 0212 if len(self.__cmd)>max: 0213 cmd_repr = self.__cmd[:max] + "..." 0214 else: 0215 cmd_repr = self.__cmd 0216 return "exec: " + cmd_repr 0217 0218 def testmain(*args, **kw): 0219 new_kw = kw.copy() 0220 if not new_kw.has_key('testLoader'): 0221 new_kw['testLoader'] = TestLoader() 0222 unittest.main(*args, **new_kw) 0223
Generated by PyXR 0.9.4