PyXR

c:\python24\lib\site-packages\win32 \ com \ test \ util.py



0001 import sys, os
0002 import win32api
0003 import tempfile
0004 import unittest
0005 import gc
0006 import pythoncom
0007 import winerror
0008 import cStringIO as StringIO
0009 from pythoncom import _GetInterfaceCount, _GetGatewayCount
0010 
0011 def CheckClean():
0012     # Ensure no lingering exceptions - Python should have zero outstanding
0013     # COM objects
0014     sys.exc_traceback = sys.exc_value = sys.exc_type = None
0015     c = _GetInterfaceCount()
0016     if c:
0017         print "Warning - %d com interface objects still alive" % c
0018     c = _GetGatewayCount()
0019     if c:
0020         print "Warning - %d com gateway objects still alive" % c
0021 
0022 def RegisterPythonServer(filename, verbose=0):
0023     cmd = '%s "%s" > nul 2>&1' % (win32api.GetModuleFileName(0), filename)
0024     if verbose:
0025         print "Registering engine", filename
0026 #       print cmd
0027     rc = os.system(cmd)
0028     if rc:
0029         raise RuntimeError, "Registration of engine '%s' failed" % filename
0030 
0031 def ExecuteShellCommand(cmd, testcase,
0032                         expected_output = None, # Set to '' to check for nothing
0033                         tracebacks_ok = 0, # OK if the output contains a t/b?
0034                         ):
0035     output_name = tempfile.mktemp('win32com_test')
0036     cmd = cmd + ' > "%s" 2>&1' % output_name
0037     rc = os.system(cmd)
0038     output = open(output_name, "r").read().strip()
0039     class Failed(Exception): pass
0040     try:
0041         if rc:
0042             raise Failed, "exit code was " + str(rc)
0043         if expected_output is not None and output != expected_output:
0044             raise Failed, \
0045                   "Expected output %r (got %r)" % (expected_output, output)
0046         if not tracebacks_ok and \
0047            output.find("Traceback (most recent call last)")>=0:
0048             raise Failed, "traceback in program output"
0049         return output
0050     except Failed, why:
0051         print "Failed to exec command '%r'" % cmd
0052         print "Failed as", why
0053         print "** start of program output **"
0054         print output
0055         print "** end of program output **"
0056         testcase.fail("Executing '%s' failed as %s" % (cmd, why))
0057 
0058 def assertRaisesCOM_HRESULT(testcase, hresult, func, *args, **kw):
0059     try:
0060         func(*args, **kw)
0061     except pythoncom.com_error, details:
0062         if details[0]==hresult:
0063             return
0064     testcase.fail("Excepected COM exception with HRESULT 0x%x" % hresult)
0065 
0066 class CaptureWriter:
0067     def __init__(self):
0068         self.old_err = self.old_out = None
0069         self.clear()
0070     def capture(self):
0071         self.clear()
0072         self.old_out = sys.stdout
0073         self.old_err = sys.stderr
0074         sys.stdout = sys.stderr = self
0075     def release(self):
0076         if self.old_out:
0077             sys.stdout = self.old_out
0078             self.old_out = None
0079         if self.old_err:
0080             sys.stderr = self.old_err
0081             self.old_err = None
0082     def clear(self):
0083         self.captured = []
0084     def write(self, msg):
0085         self.captured.append(msg)
0086     def get_captured(self):
0087         return "".join(self.captured)
0088     def get_num_lines_captured(self):
0089         return len("".join(self.captured).split("\n"))
0090 
0091 class LeakTestCase(unittest.TestCase):
0092     def __init__(self, real_test):
0093         unittest.TestCase.__init__(self)
0094         self.real_test = real_test
0095         self.num_test_cases = 1
0096         self.num_leak_iters = 2 # seems to be enough!
0097         if hasattr(sys, "gettotalrefcount"):
0098             self.num_test_cases = self.num_test_cases + self.num_leak_iters
0099     def countTestCases(self):
0100         return self.num_test_cases
0101     def runTest(self):
0102         assert 0, "not used"
0103     def __call__(self, result = None):
0104         # Always ensure we don't leak gateways/interfaces
0105         gc.collect()
0106         ni = _GetInterfaceCount()
0107         ng = _GetGatewayCount()
0108         self.real_test(result)
0109         # Failed - no point checking anything else
0110         if result.shouldStop or not result.wasSuccessful():
0111             return
0112         self._do_leak_tests(result)
0113         gc.collect()
0114         lost_i = _GetInterfaceCount() - ni
0115         lost_g = _GetGatewayCount() - ng
0116         if lost_i or lost_g:
0117             msg = "%d interface objects and %d gateway objects leaked" \
0118                                                         % (lost_i, lost_g)
0119             result.addFailure(self.real_test, (AssertionError, msg, None))
0120     def _do_leak_tests(self, result = None):
0121         try:
0122             gtrc = sys.gettotalrefcount
0123         except AttributeError:
0124             return # can't do leak tests in this build
0125             def gtrc():
0126                 return 0
0127         # Assume already called once, to prime any caches etc
0128         trc = gtrc()
0129         for i in range(self.num_leak_iters):
0130             self.real_test(result)
0131             if result.shouldStop:
0132                 break
0133         del i # created after we remembered the refcount!
0134         # int division here means one or 2 stray references won't force 
0135         # failure, but one per loop 
0136         lost = (gtrc() - trc) // self.num_leak_iters
0137         if lost < 0:
0138             msg = "LeakTest: %s appeared to gain %d references!!" % (self.real_test, -lost)
0139             result.addFailure(self.real_test, (AssertionError, msg, None))
0140         if lost > 0:
0141             msg = "LeakTest: %s lost %d references" % (self.real_test, lost)
0142             result.addFailure(self.real_test, (AssertionError, msg, None))
0143 
0144 class TestLoader(unittest.TestLoader):
0145     def loadTestsFromTestCase(self, testCaseClass):
0146         """Return a suite of all tests cases contained in testCaseClass"""
0147         leak_tests = []
0148         for name in self.getTestCaseNames(testCaseClass):
0149             real_test = testCaseClass(name)
0150             leak_test = self._getTestWrapper(real_test)
0151             leak_tests.append(leak_test)
0152         return self.suiteClass(leak_tests)
0153     def _getTestWrapper(self, test):
0154         no_leak_tests = getattr(test, "no_leak_tests", False)
0155         if no_leak_tests:
0156             print "Test says it doesn't want leak tests!"
0157             return test
0158         return LeakTestCase(test)
0159     def loadTestsFromModule(self, mod):
0160         if hasattr(mod, "suite"):
0161             return mod.suite()
0162         else:
0163             return unittest.TestLoader.loadTestsFromModule(self, mod)
0164     def loadTestsFromName(self, name, module=None):
0165         test = unittest.TestLoader.loadTestsFromName(self, name, module)
0166         if isinstance(test, unittest.TestSuite):
0167             pass # hmmm? print "Don't wrap suites yet!", test._tests
0168         elif isinstance(test, unittest.TestCase):
0169             test = self._getTestWrapper(test)
0170         else:
0171             print "XXX - what is", test
0172         return test
0173 
0174 # We used to override some of this (and may later!)
0175 TestCase = unittest.TestCase
0176 
0177 def CapturingFunctionTestCase(*args, **kw):
0178     real_test = _CapturingFunctionTestCase(*args, **kw)
0179     return LeakTestCase(real_test)
0180 
0181 class _CapturingFunctionTestCase(unittest.FunctionTestCase):#, TestCaseMixin):
0182     def __call__(self, result=None):
0183         if result is None: result = self.defaultTestResult()
0184         writer = CaptureWriter()
0185         #self._preTest()
0186         writer.capture()
0187         try:
0188             unittest.FunctionTestCase.__call__(self, result)
0189             if getattr(self, "do_leak_tests", 0) and hasattr(sys, "gettotalrefcount"):
0190                 self.run_leak_tests(result)
0191         finally:
0192             writer.release()
0193             #self._postTest(result)
0194         output = writer.get_captured()
0195         self.checkOutput(output, result)
0196         if result.showAll:
0197             print output
0198     def checkOutput(self, output, result):
0199         if output.find("Traceback")>=0:
0200             msg = "Test output contained a traceback\n---\n%s\n---" % output
0201             result.errors.append((self, msg))
0202 
0203 class ShellTestCase(unittest.TestCase):
0204     def __init__(self, cmd, expected_output):
0205         self.__cmd = cmd
0206         self.__eo = expected_output
0207         unittest.TestCase.__init__(self)
0208     def runTest(self):
0209         ExecuteShellCommand(self.__cmd, self, self.__eo)
0210     def __str__(self):
0211         max = 30
0212         if len(self.__cmd)>max:
0213             cmd_repr = self.__cmd[:max] + "..."
0214         else:
0215             cmd_repr = self.__cmd
0216         return "exec: " + cmd_repr
0217 
0218 def testmain(*args, **kw):
0219     new_kw = kw.copy()
0220     if not new_kw.has_key('testLoader'):
0221         new_kw['testLoader'] = TestLoader()
0222     unittest.main(*args, **new_kw)
0223 

Generated by PyXR 0.9.4
SourceForge.net Logo