0001 """TestCases for multi-threaded access to a DB. 0002 """ 0003 0004 import os 0005 import sys 0006 import time 0007 import errno 0008 import shutil 0009 import tempfile 0010 from pprint import pprint 0011 from random import random 0012 0013 try: 0014 True, False 0015 except NameError: 0016 True = 1 0017 False = 0 0018 0019 DASH = '-' 0020 0021 try: 0022 from threading import Thread, currentThread 0023 have_threads = True 0024 except ImportError: 0025 have_threads = False 0026 0027 import unittest 0028 from test_all import verbose 0029 0030 try: 0031 # For Pythons w/distutils pybsddb 0032 from bsddb3 import db, dbutils 0033 except ImportError: 0034 # For Python 2.3 0035 from bsddb import db, dbutils 0036 0037 0038 #---------------------------------------------------------------------- 0039 0040 class BaseThreadedTestCase(unittest.TestCase): 0041 dbtype = db.DB_UNKNOWN # must be set in derived class 0042 dbopenflags = 0 0043 dbsetflags = 0 0044 envflags = 0 0045 0046 def setUp(self): 0047 if verbose: 0048 dbutils._deadlock_VerboseFile = sys.stdout 0049 0050 homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home') 0051 self.homeDir = homeDir 0052 try: 0053 os.mkdir(homeDir) 0054 except OSError, e: 0055 if e.errno <> errno.EEXIST: raise 0056 self.env = db.DBEnv() 0057 self.setEnvOpts() 0058 self.env.open(homeDir, self.envflags | db.DB_CREATE) 0059 0060 self.filename = self.__class__.__name__ + '.db' 0061 self.d = db.DB(self.env) 0062 if self.dbsetflags: 0063 self.d.set_flags(self.dbsetflags) 0064 self.d.open(self.filename, self.dbtype, self.dbopenflags|db.DB_CREATE) 0065 0066 def tearDown(self): 0067 self.d.close() 0068 self.env.close() 0069 shutil.rmtree(self.homeDir) 0070 0071 def setEnvOpts(self): 0072 pass 0073 0074 def makeData(self, key): 0075 return DASH.join([key] * 5) 0076 0077 0078 #---------------------------------------------------------------------- 0079 0080 0081 class ConcurrentDataStoreBase(BaseThreadedTestCase): 0082 dbopenflags = db.DB_THREAD 0083 envflags = db.DB_THREAD | db.DB_INIT_CDB | db.DB_INIT_MPOOL 0084 readers = 0 # derived class should set 0085 writers = 0 0086 records = 1000 0087 0088 def test01_1WriterMultiReaders(self): 0089 if verbose: 0090 print '\n', '-=' * 30 0091 print "Running %s.test01_1WriterMultiReaders..." % \ 0092 self.__class__.__name__ 0093 0094 threads = [] 0095 for x in range(self.writers): 0096 wt = Thread(target = self.writerThread, 0097 args = (self.d, self.records, x), 0098 name = 'writer %d' % x, 0099 )#verbose = verbose) 0100 threads.append(wt) 0101 0102 for x in range(self.readers): 0103 rt = Thread(target = self.readerThread, 0104 args = (self.d, x), 0105 name = 'reader %d' % x, 0106 )#verbose = verbose) 0107 threads.append(rt) 0108 0109 for t in threads: 0110 t.start() 0111 for t in threads: 0112 t.join() 0113 0114 def writerThread(self, d, howMany, writerNum): 0115 #time.sleep(0.01 * writerNum + 0.01) 0116 name = currentThread().getName() 0117 start = howMany * writerNum 0118 stop = howMany * (writerNum + 1) - 1 0119 if verbose: 0120 print "%s: creating records %d - %d" % (name, start, stop) 0121 0122 for x in range(start, stop): 0123 key = '%04d' % x 0124 dbutils.DeadlockWrap(d.put, key, self.makeData(key), 0125 max_retries=12) 0126 if verbose and x % 100 == 0: 0127 print "%s: records %d - %d finished" % (name, start, x) 0128 0129 if verbose: 0130 print "%s: finished creating records" % name 0131 0132 ## # Each write-cursor will be exclusive, the only one that can update the DB... 0133 ## if verbose: print "%s: deleting a few records" % name 0134 ## c = d.cursor(flags = db.DB_WRITECURSOR) 0135 ## for x in range(10): 0136 ## key = int(random() * howMany) + start 0137 ## key = '%04d' % key 0138 ## if d.has_key(key): 0139 ## c.set(key) 0140 ## c.delete() 0141 0142 ## c.close() 0143 if verbose: 0144 print "%s: thread finished" % name 0145 0146 def readerThread(self, d, readerNum): 0147 time.sleep(0.01 * readerNum) 0148 name = currentThread().getName() 0149 0150 for loop in range(5): 0151 c = d.cursor() 0152 count = 0 0153 rec = c.first() 0154 while rec: 0155 count += 1 0156 key, data = rec 0157 self.assertEqual(self.makeData(key), data) 0158 rec = c.next() 0159 if verbose: 0160 print "%s: found %d records" % (name, count) 0161 c.close() 0162 time.sleep(0.05) 0163 0164 if verbose: 0165 print "%s: thread finished" % name 0166 0167 0168 class BTreeConcurrentDataStore(ConcurrentDataStoreBase): 0169 dbtype = db.DB_BTREE 0170 writers = 2 0171 readers = 10 0172 records = 1000 0173 0174 0175 class HashConcurrentDataStore(ConcurrentDataStoreBase): 0176 dbtype = db.DB_HASH 0177 writers = 2 0178 readers = 10 0179 records = 1000 0180 0181 0182 #---------------------------------------------------------------------- 0183 0184 class SimpleThreadedBase(BaseThreadedTestCase): 0185 dbopenflags = db.DB_THREAD 0186 envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK 0187 readers = 5 0188 writers = 3 0189 records = 1000 0190 0191 def setEnvOpts(self): 0192 self.env.set_lk_detect(db.DB_LOCK_DEFAULT) 0193 0194 def test02_SimpleLocks(self): 0195 if verbose: 0196 print '\n', '-=' * 30 0197 print "Running %s.test02_SimpleLocks..." % self.__class__.__name__ 0198 0199 threads = [] 0200 for x in range(self.writers): 0201 wt = Thread(target = self.writerThread, 0202 args = (self.d, self.records, x), 0203 name = 'writer %d' % x, 0204 )#verbose = verbose) 0205 threads.append(wt) 0206 for x in range(self.readers): 0207 rt = Thread(target = self.readerThread, 0208 args = (self.d, x), 0209 name = 'reader %d' % x, 0210 )#verbose = verbose) 0211 threads.append(rt) 0212 0213 for t in threads: 0214 t.start() 0215 for t in threads: 0216 t.join() 0217 0218 def writerThread(self, d, howMany, writerNum): 0219 name = currentThread().getName() 0220 start = howMany * writerNum 0221 stop = howMany * (writerNum + 1) - 1 0222 if verbose: 0223 print "%s: creating records %d - %d" % (name, start, stop) 0224 0225 # create a bunch of records 0226 for x in xrange(start, stop): 0227 key = '%04d' % x 0228 dbutils.DeadlockWrap(d.put, key, self.makeData(key), 0229 max_retries=12) 0230 0231 if verbose and x % 100 == 0: 0232 print "%s: records %d - %d finished" % (name, start, x) 0233 0234 # do a bit or reading too 0235 if random() <= 0.05: 0236 for y in xrange(start, x): 0237 key = '%04d' % x 0238 data = dbutils.DeadlockWrap(d.get, key, max_retries=12) 0239 self.assertEqual(data, self.makeData(key)) 0240 0241 # flush them 0242 try: 0243 dbutils.DeadlockWrap(d.sync, max_retries=12) 0244 except db.DBIncompleteError, val: 0245 if verbose: 0246 print "could not complete sync()..." 0247 0248 # read them back, deleting a few 0249 for x in xrange(start, stop): 0250 key = '%04d' % x 0251 data = dbutils.DeadlockWrap(d.get, key, max_retries=12) 0252 if verbose and x % 100 == 0: 0253 print "%s: fetched record (%s, %s)" % (name, key, data) 0254 self.assertEqual(data, self.makeData(key)) 0255 if random() <= 0.10: 0256 dbutils.DeadlockWrap(d.delete, key, max_retries=12) 0257 if verbose: 0258 print "%s: deleted record %s" % (name, key) 0259 0260 if verbose: 0261 print "%s: thread finished" % name 0262 0263 def readerThread(self, d, readerNum): 0264 time.sleep(0.01 * readerNum) 0265 name = currentThread().getName() 0266 0267 for loop in range(5): 0268 c = d.cursor() 0269 count = 0 0270 rec = dbutils.DeadlockWrap(c.first, max_retries=10) 0271 while rec: 0272 count += 1 0273 key, data = rec 0274 self.assertEqual(self.makeData(key), data) 0275 rec = dbutils.DeadlockWrap(c.next, max_retries=10) 0276 if verbose: 0277 print "%s: found %d records" % (name, count) 0278 c.close() 0279 time.sleep(0.05) 0280 0281 if verbose: 0282 print "%s: thread finished" % name 0283 0284 0285 class BTreeSimpleThreaded(SimpleThreadedBase): 0286 dbtype = db.DB_BTREE 0287 0288 0289 class HashSimpleThreaded(SimpleThreadedBase): 0290 dbtype = db.DB_HASH 0291 0292 0293 #---------------------------------------------------------------------- 0294 0295 0296 class ThreadedTransactionsBase(BaseThreadedTestCase): 0297 dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT 0298 envflags = (db.DB_THREAD | 0299 db.DB_INIT_MPOOL | 0300 db.DB_INIT_LOCK | 0301 db.DB_INIT_LOG | 0302 db.DB_INIT_TXN 0303 ) 0304 readers = 0 0305 writers = 0 0306 records = 2000 0307 txnFlag = 0 0308 0309 def setEnvOpts(self): 0310 #self.env.set_lk_detect(db.DB_LOCK_DEFAULT) 0311 pass 0312 0313 def test03_ThreadedTransactions(self): 0314 if verbose: 0315 print '\n', '-=' * 30 0316 print "Running %s.test03_ThreadedTransactions..." % \ 0317 self.__class__.__name__ 0318 0319 threads = [] 0320 for x in range(self.writers): 0321 wt = Thread(target = self.writerThread, 0322 args = (self.d, self.records, x), 0323 name = 'writer %d' % x, 0324 )#verbose = verbose) 0325 threads.append(wt) 0326 0327 for x in range(self.readers): 0328 rt = Thread(target = self.readerThread, 0329 args = (self.d, x), 0330 name = 'reader %d' % x, 0331 )#verbose = verbose) 0332 threads.append(rt) 0333 0334 dt = Thread(target = self.deadlockThread) 0335 dt.start() 0336 0337 for t in threads: 0338 t.start() 0339 for t in threads: 0340 t.join() 0341 0342 self.doLockDetect = False 0343 dt.join() 0344 0345 def doWrite(self, d, name, start, stop): 0346 finished = False 0347 while not finished: 0348 try: 0349 txn = self.env.txn_begin(None, self.txnFlag) 0350 for x in range(start, stop): 0351 key = '%04d' % x 0352 d.put(key, self.makeData(key), txn) 0353 if verbose and x % 100 == 0: 0354 print "%s: records %d - %d finished" % (name, start, x) 0355 txn.commit() 0356 finished = True 0357 except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val: 0358 if verbose: 0359 print "%s: Aborting transaction (%s)" % (name, val[1]) 0360 txn.abort() 0361 time.sleep(0.05) 0362 0363 def writerThread(self, d, howMany, writerNum): 0364 name = currentThread().getName() 0365 start = howMany * writerNum 0366 stop = howMany * (writerNum + 1) - 1 0367 if verbose: 0368 print "%s: creating records %d - %d" % (name, start, stop) 0369 0370 step = 100 0371 for x in range(start, stop, step): 0372 self.doWrite(d, name, x, min(stop, x+step)) 0373 0374 if verbose: 0375 print "%s: finished creating records" % name 0376 if verbose: 0377 print "%s: deleting a few records" % name 0378 0379 finished = False 0380 while not finished: 0381 try: 0382 recs = [] 0383 txn = self.env.txn_begin(None, self.txnFlag) 0384 for x in range(10): 0385 key = int(random() * howMany) + start 0386 key = '%04d' % key 0387 data = d.get(key, None, txn, db.DB_RMW) 0388 if data is not None: 0389 d.delete(key, txn) 0390 recs.append(key) 0391 txn.commit() 0392 finished = True 0393 if verbose: 0394 print "%s: deleted records %s" % (name, recs) 0395 except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val: 0396 if verbose: 0397 print "%s: Aborting transaction (%s)" % (name, val[1]) 0398 txn.abort() 0399 time.sleep(0.05) 0400 0401 if verbose: 0402 print "%s: thread finished" % name 0403 0404 def readerThread(self, d, readerNum): 0405 time.sleep(0.01 * readerNum + 0.05) 0406 name = currentThread().getName() 0407 0408 for loop in range(5): 0409 finished = False 0410 while not finished: 0411 try: 0412 txn = self.env.txn_begin(None, self.txnFlag) 0413 c = d.cursor(txn) 0414 count = 0 0415 rec = c.first() 0416 while rec: 0417 count += 1 0418 key, data = rec 0419 self.assertEqual(self.makeData(key), data) 0420 rec = c.next() 0421 if verbose: print "%s: found %d records" % (name, count) 0422 c.close() 0423 txn.commit() 0424 finished = True 0425 except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val: 0426 if verbose: 0427 print "%s: Aborting transaction (%s)" % (name, val[1]) 0428 c.close() 0429 txn.abort() 0430 time.sleep(0.05) 0431 0432 time.sleep(0.05) 0433 0434 if verbose: 0435 print "%s: thread finished" % name 0436 0437 def deadlockThread(self): 0438 self.doLockDetect = True 0439 while self.doLockDetect: 0440 time.sleep(0.5) 0441 try: 0442 aborted = self.env.lock_detect( 0443 db.DB_LOCK_RANDOM, db.DB_LOCK_CONFLICT) 0444 if verbose and aborted: 0445 print "deadlock: Aborted %d deadlocked transaction(s)" \ 0446 % aborted 0447 except db.DBError: 0448 pass 0449 0450 0451 class BTreeThreadedTransactions(ThreadedTransactionsBase): 0452 dbtype = db.DB_BTREE 0453 writers = 3 0454 readers = 5 0455 records = 2000 0456 0457 class HashThreadedTransactions(ThreadedTransactionsBase): 0458 dbtype = db.DB_HASH 0459 writers = 1 0460 readers = 5 0461 records = 2000 0462 0463 class BTreeThreadedNoWaitTransactions(ThreadedTransactionsBase): 0464 dbtype = db.DB_BTREE 0465 writers = 3 0466 readers = 5 0467 records = 2000 0468 txnFlag = db.DB_TXN_NOWAIT 0469 0470 class HashThreadedNoWaitTransactions(ThreadedTransactionsBase): 0471 dbtype = db.DB_HASH 0472 writers = 1 0473 readers = 5 0474 records = 2000 0475 txnFlag = db.DB_TXN_NOWAIT 0476 0477 0478 #---------------------------------------------------------------------- 0479 0480 def test_suite(): 0481 suite = unittest.TestSuite() 0482 0483 if have_threads: 0484 suite.addTest(unittest.makeSuite(BTreeConcurrentDataStore)) 0485 suite.addTest(unittest.makeSuite(HashConcurrentDataStore)) 0486 suite.addTest(unittest.makeSuite(BTreeSimpleThreaded)) 0487 suite.addTest(unittest.makeSuite(HashSimpleThreaded)) 0488 suite.addTest(unittest.makeSuite(BTreeThreadedTransactions)) 0489 suite.addTest(unittest.makeSuite(HashThreadedTransactions)) 0490 suite.addTest(unittest.makeSuite(BTreeThreadedNoWaitTransactions)) 0491 suite.addTest(unittest.makeSuite(HashThreadedNoWaitTransactions)) 0492 0493 else: 0494 print "Threads not available, skipping thread tests." 0495 0496 return suite 0497 0498 0499 if __name__ == '__main__': 0500 unittest.main(defaultTest='test_suite') 0501
Generated by PyXR 0.9.4