0001 """ 0002 TestCases for testing the locking sub-system. 0003 """ 0004 0005 import sys, os, string 0006 import tempfile 0007 import time 0008 from pprint import pprint 0009 0010 try: 0011 from threading import Thread, currentThread 0012 have_threads = 1 0013 except ImportError: 0014 have_threads = 0 0015 0016 0017 import unittest 0018 from test_all import verbose 0019 0020 try: 0021 # For Pythons w/distutils pybsddb 0022 from bsddb3 import db 0023 except ImportError: 0024 # For Python 2.3 0025 from bsddb import db 0026 0027 0028 #---------------------------------------------------------------------- 0029 0030 class LockingTestCase(unittest.TestCase): 0031 0032 def setUp(self): 0033 homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home') 0034 self.homeDir = homeDir 0035 try: os.mkdir(homeDir) 0036 except os.error: pass 0037 self.env = db.DBEnv() 0038 self.env.open(homeDir, db.DB_THREAD | db.DB_INIT_MPOOL | 0039 db.DB_INIT_LOCK | db.DB_CREATE) 0040 0041 0042 def tearDown(self): 0043 self.env.close() 0044 import glob 0045 files = glob.glob(os.path.join(self.homeDir, '*')) 0046 for file in files: 0047 os.remove(file) 0048 0049 0050 def test01_simple(self): 0051 if verbose: 0052 print '\n', '-=' * 30 0053 print "Running %s.test01_simple..." % self.__class__.__name__ 0054 0055 anID = self.env.lock_id() 0056 if verbose: 0057 print "locker ID: %s" % anID 0058 lock = self.env.lock_get(anID, "some locked thing", db.DB_LOCK_WRITE) 0059 if verbose: 0060 print "Aquired lock: %s" % lock 0061 time.sleep(1) 0062 self.env.lock_put(lock) 0063 if verbose: 0064 print "Released lock: %s" % lock 0065 0066 0067 0068 0069 def test02_threaded(self): 0070 if verbose: 0071 print '\n', '-=' * 30 0072 print "Running %s.test02_threaded..." % self.__class__.__name__ 0073 0074 threads = [] 0075 threads.append(Thread(target = self.theThread, 0076 args=(5, db.DB_LOCK_WRITE))) 0077 threads.append(Thread(target = self.theThread, 0078 args=(1, db.DB_LOCK_READ))) 0079 threads.append(Thread(target = self.theThread, 0080 args=(1, db.DB_LOCK_READ))) 0081 threads.append(Thread(target = self.theThread, 0082 args=(1, db.DB_LOCK_WRITE))) 0083 threads.append(Thread(target = self.theThread, 0084 args=(1, db.DB_LOCK_READ))) 0085 threads.append(Thread(target = self.theThread, 0086 args=(1, db.DB_LOCK_READ))) 0087 threads.append(Thread(target = self.theThread, 0088 args=(1, db.DB_LOCK_WRITE))) 0089 threads.append(Thread(target = self.theThread, 0090 args=(1, db.DB_LOCK_WRITE))) 0091 threads.append(Thread(target = self.theThread, 0092 args=(1, db.DB_LOCK_WRITE))) 0093 0094 for t in threads: 0095 t.start() 0096 for t in threads: 0097 t.join() 0098 0099 def test03_set_timeout(self): 0100 # test that the set_timeout call works 0101 if hasattr(self.env, 'set_timeout'): 0102 self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT) 0103 self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT) 0104 self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT) 0105 self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT) 0106 0107 def theThread(self, sleepTime, lockType): 0108 name = currentThread().getName() 0109 if lockType == db.DB_LOCK_WRITE: 0110 lt = "write" 0111 else: 0112 lt = "read" 0113 0114 anID = self.env.lock_id() 0115 if verbose: 0116 print "%s: locker ID: %s" % (name, anID) 0117 0118 lock = self.env.lock_get(anID, "some locked thing", lockType) 0119 if verbose: 0120 print "%s: Aquired %s lock: %s" % (name, lt, lock) 0121 0122 time.sleep(sleepTime) 0123 0124 self.env.lock_put(lock) 0125 if verbose: 0126 print "%s: Released %s lock: %s" % (name, lt, lock) 0127 0128 0129 #---------------------------------------------------------------------- 0130 0131 def test_suite(): 0132 suite = unittest.TestSuite() 0133 0134 if have_threads: 0135 suite.addTest(unittest.makeSuite(LockingTestCase)) 0136 else: 0137 suite.addTest(unittest.makeSuite(LockingTestCase, 'test01')) 0138 0139 return suite 0140 0141 0142 if __name__ == '__main__': 0143 unittest.main(defaultTest='test_suite') 0144
Generated by PyXR 0.9.4