PyXR

c:\python24\lib \ bsddb \ test \ test_thread.py



0001 """TestCases for multi-threaded access to a DB.
0002 """
0003 
0004 import os
0005 import sys
0006 import time
0007 import errno
0008 import shutil
0009 import tempfile
0010 from pprint import pprint
0011 from random import random
0012 
0013 try:
0014     True, False
0015 except NameError:
0016     True = 1
0017     False = 0
0018 
0019 DASH = '-'
0020 
0021 try:
0022     from threading import Thread, currentThread
0023     have_threads = True
0024 except ImportError:
0025     have_threads = False
0026 
0027 import unittest
0028 from test_all import verbose
0029 
0030 try:
0031     # For Pythons w/distutils pybsddb
0032     from bsddb3 import db, dbutils
0033 except ImportError:
0034     # For Python 2.3
0035     from bsddb import db, dbutils
0036 
0037 
0038 #----------------------------------------------------------------------
0039 
0040 class BaseThreadedTestCase(unittest.TestCase):
0041     dbtype       = db.DB_UNKNOWN  # must be set in derived class
0042     dbopenflags  = 0
0043     dbsetflags   = 0
0044     envflags     = 0
0045 
0046     def setUp(self):
0047         if verbose:
0048             dbutils._deadlock_VerboseFile = sys.stdout
0049 
0050         homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
0051         self.homeDir = homeDir
0052         try:
0053             os.mkdir(homeDir)
0054         except OSError, e:
0055             if e.errno <> errno.EEXIST: raise
0056         self.env = db.DBEnv()
0057         self.setEnvOpts()
0058         self.env.open(homeDir, self.envflags | db.DB_CREATE)
0059 
0060         self.filename = self.__class__.__name__ + '.db'
0061         self.d = db.DB(self.env)
0062         if self.dbsetflags:
0063             self.d.set_flags(self.dbsetflags)
0064         self.d.open(self.filename, self.dbtype, self.dbopenflags|db.DB_CREATE)
0065 
0066     def tearDown(self):
0067         self.d.close()
0068         self.env.close()
0069         shutil.rmtree(self.homeDir)
0070 
0071     def setEnvOpts(self):
0072         pass
0073 
0074     def makeData(self, key):
0075         return DASH.join([key] * 5)
0076 
0077 
0078 #----------------------------------------------------------------------
0079 
0080 
0081 class ConcurrentDataStoreBase(BaseThreadedTestCase):
0082     dbopenflags = db.DB_THREAD
0083     envflags    = db.DB_THREAD | db.DB_INIT_CDB | db.DB_INIT_MPOOL
0084     readers     = 0 # derived class should set
0085     writers     = 0
0086     records     = 1000
0087 
0088     def test01_1WriterMultiReaders(self):
0089         if verbose:
0090             print '\n', '-=' * 30
0091             print "Running %s.test01_1WriterMultiReaders..." % \
0092                   self.__class__.__name__
0093 
0094         threads = []
0095         for x in range(self.writers):
0096             wt = Thread(target = self.writerThread,
0097                         args = (self.d, self.records, x),
0098                         name = 'writer %d' % x,
0099                         )#verbose = verbose)
0100             threads.append(wt)
0101 
0102         for x in range(self.readers):
0103             rt = Thread(target = self.readerThread,
0104                         args = (self.d, x),
0105                         name = 'reader %d' % x,
0106                         )#verbose = verbose)
0107             threads.append(rt)
0108 
0109         for t in threads:
0110             t.start()
0111         for t in threads:
0112             t.join()
0113 
0114     def writerThread(self, d, howMany, writerNum):
0115         #time.sleep(0.01 * writerNum + 0.01)
0116         name = currentThread().getName()
0117         start = howMany * writerNum
0118         stop = howMany * (writerNum + 1) - 1
0119         if verbose:
0120             print "%s: creating records %d - %d" % (name, start, stop)
0121 
0122         for x in range(start, stop):
0123             key = '%04d' % x
0124             dbutils.DeadlockWrap(d.put, key, self.makeData(key),
0125                                  max_retries=12)
0126             if verbose and x % 100 == 0:
0127                 print "%s: records %d - %d finished" % (name, start, x)
0128 
0129         if verbose:
0130             print "%s: finished creating records" % name
0131 
0132 ##         # Each write-cursor will be exclusive, the only one that can update the DB...
0133 ##         if verbose: print "%s: deleting a few records" % name
0134 ##         c = d.cursor(flags = db.DB_WRITECURSOR)
0135 ##         for x in range(10):
0136 ##             key = int(random() * howMany) + start
0137 ##             key = '%04d' % key
0138 ##             if d.has_key(key):
0139 ##                 c.set(key)
0140 ##                 c.delete()
0141 
0142 ##         c.close()
0143         if verbose:
0144             print "%s: thread finished" % name
0145 
0146     def readerThread(self, d, readerNum):
0147         time.sleep(0.01 * readerNum)
0148         name = currentThread().getName()
0149 
0150         for loop in range(5):
0151             c = d.cursor()
0152             count = 0
0153             rec = c.first()
0154             while rec:
0155                 count += 1
0156                 key, data = rec
0157                 self.assertEqual(self.makeData(key), data)
0158                 rec = c.next()
0159             if verbose:
0160                 print "%s: found %d records" % (name, count)
0161             c.close()
0162             time.sleep(0.05)
0163 
0164         if verbose:
0165             print "%s: thread finished" % name
0166 
0167 
0168 class BTreeConcurrentDataStore(ConcurrentDataStoreBase):
0169     dbtype  = db.DB_BTREE
0170     writers = 2
0171     readers = 10
0172     records = 1000
0173 
0174 
0175 class HashConcurrentDataStore(ConcurrentDataStoreBase):
0176     dbtype  = db.DB_HASH
0177     writers = 2
0178     readers = 10
0179     records = 1000
0180 
0181 
0182 #----------------------------------------------------------------------
0183 
0184 class SimpleThreadedBase(BaseThreadedTestCase):
0185     dbopenflags = db.DB_THREAD
0186     envflags    = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
0187     readers = 5
0188     writers = 3
0189     records = 1000
0190 
0191     def setEnvOpts(self):
0192         self.env.set_lk_detect(db.DB_LOCK_DEFAULT)
0193 
0194     def test02_SimpleLocks(self):
0195         if verbose:
0196             print '\n', '-=' * 30
0197             print "Running %s.test02_SimpleLocks..." % self.__class__.__name__
0198 
0199         threads = []
0200         for x in range(self.writers):
0201             wt = Thread(target = self.writerThread,
0202                         args = (self.d, self.records, x),
0203                         name = 'writer %d' % x,
0204                         )#verbose = verbose)
0205             threads.append(wt)
0206         for x in range(self.readers):
0207             rt = Thread(target = self.readerThread,
0208                         args = (self.d, x),
0209                         name = 'reader %d' % x,
0210                         )#verbose = verbose)
0211             threads.append(rt)
0212 
0213         for t in threads:
0214             t.start()
0215         for t in threads:
0216             t.join()
0217 
0218     def writerThread(self, d, howMany, writerNum):
0219         name = currentThread().getName()
0220         start = howMany * writerNum
0221         stop = howMany * (writerNum + 1) - 1
0222         if verbose:
0223             print "%s: creating records %d - %d" % (name, start, stop)
0224 
0225         # create a bunch of records
0226         for x in xrange(start, stop):
0227             key = '%04d' % x
0228             dbutils.DeadlockWrap(d.put, key, self.makeData(key),
0229                                  max_retries=12)
0230 
0231             if verbose and x % 100 == 0:
0232                 print "%s: records %d - %d finished" % (name, start, x)
0233 
0234             # do a bit or reading too
0235             if random() <= 0.05:
0236                 for y in xrange(start, x):
0237                     key = '%04d' % x
0238                     data = dbutils.DeadlockWrap(d.get, key, max_retries=12)
0239                     self.assertEqual(data, self.makeData(key))
0240 
0241         # flush them
0242         try:
0243             dbutils.DeadlockWrap(d.sync, max_retries=12)
0244         except db.DBIncompleteError, val:
0245             if verbose:
0246                 print "could not complete sync()..."
0247 
0248         # read them back, deleting a few
0249         for x in xrange(start, stop):
0250             key = '%04d' % x
0251             data = dbutils.DeadlockWrap(d.get, key, max_retries=12)
0252             if verbose and x % 100 == 0:
0253                 print "%s: fetched record (%s, %s)" % (name, key, data)
0254             self.assertEqual(data, self.makeData(key))
0255             if random() <= 0.10:
0256                 dbutils.DeadlockWrap(d.delete, key, max_retries=12)
0257                 if verbose:
0258                     print "%s: deleted record %s" % (name, key)
0259 
0260         if verbose:
0261             print "%s: thread finished" % name
0262 
0263     def readerThread(self, d, readerNum):
0264         time.sleep(0.01 * readerNum)
0265         name = currentThread().getName()
0266 
0267         for loop in range(5):
0268             c = d.cursor()
0269             count = 0
0270             rec = dbutils.DeadlockWrap(c.first, max_retries=10)
0271             while rec:
0272                 count += 1
0273                 key, data = rec
0274                 self.assertEqual(self.makeData(key), data)
0275                 rec = dbutils.DeadlockWrap(c.next, max_retries=10)
0276             if verbose:
0277                 print "%s: found %d records" % (name, count)
0278             c.close()
0279             time.sleep(0.05)
0280 
0281         if verbose:
0282             print "%s: thread finished" % name
0283 
0284 
0285 class BTreeSimpleThreaded(SimpleThreadedBase):
0286     dbtype = db.DB_BTREE
0287 
0288 
0289 class HashSimpleThreaded(SimpleThreadedBase):
0290     dbtype = db.DB_HASH
0291 
0292 
0293 #----------------------------------------------------------------------
0294 
0295 
0296 class ThreadedTransactionsBase(BaseThreadedTestCase):
0297     dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT
0298     envflags    = (db.DB_THREAD |
0299                    db.DB_INIT_MPOOL |
0300                    db.DB_INIT_LOCK |
0301                    db.DB_INIT_LOG |
0302                    db.DB_INIT_TXN
0303                    )
0304     readers = 0
0305     writers = 0
0306     records = 2000
0307     txnFlag = 0
0308 
0309     def setEnvOpts(self):
0310         #self.env.set_lk_detect(db.DB_LOCK_DEFAULT)
0311         pass
0312 
0313     def test03_ThreadedTransactions(self):
0314         if verbose:
0315             print '\n', '-=' * 30
0316             print "Running %s.test03_ThreadedTransactions..." % \
0317                   self.__class__.__name__
0318 
0319         threads = []
0320         for x in range(self.writers):
0321             wt = Thread(target = self.writerThread,
0322                         args = (self.d, self.records, x),
0323                         name = 'writer %d' % x,
0324                         )#verbose = verbose)
0325             threads.append(wt)
0326 
0327         for x in range(self.readers):
0328             rt = Thread(target = self.readerThread,
0329                         args = (self.d, x),
0330                         name = 'reader %d' % x,
0331                         )#verbose = verbose)
0332             threads.append(rt)
0333 
0334         dt = Thread(target = self.deadlockThread)
0335         dt.start()
0336 
0337         for t in threads:
0338             t.start()
0339         for t in threads:
0340             t.join()
0341 
0342         self.doLockDetect = False
0343         dt.join()
0344 
0345     def doWrite(self, d, name, start, stop):
0346         finished = False
0347         while not finished:
0348             try:
0349                 txn = self.env.txn_begin(None, self.txnFlag)
0350                 for x in range(start, stop):
0351                     key = '%04d' % x
0352                     d.put(key, self.makeData(key), txn)
0353                     if verbose and x % 100 == 0:
0354                         print "%s: records %d - %d finished" % (name, start, x)
0355                 txn.commit()
0356                 finished = True
0357             except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
0358                 if verbose:
0359                     print "%s: Aborting transaction (%s)" % (name, val[1])
0360                 txn.abort()
0361                 time.sleep(0.05)
0362 
0363     def writerThread(self, d, howMany, writerNum):
0364         name = currentThread().getName()
0365         start = howMany * writerNum
0366         stop = howMany * (writerNum + 1) - 1
0367         if verbose:
0368             print "%s: creating records %d - %d" % (name, start, stop)
0369 
0370         step = 100
0371         for x in range(start, stop, step):
0372             self.doWrite(d, name, x, min(stop, x+step))
0373 
0374         if verbose:
0375             print "%s: finished creating records" % name
0376         if verbose:
0377             print "%s: deleting a few records" % name
0378 
0379         finished = False
0380         while not finished:
0381             try:
0382                 recs = []
0383                 txn = self.env.txn_begin(None, self.txnFlag)
0384                 for x in range(10):
0385                     key = int(random() * howMany) + start
0386                     key = '%04d' % key
0387                     data = d.get(key, None, txn, db.DB_RMW)
0388                     if data is not None:
0389                         d.delete(key, txn)
0390                         recs.append(key)
0391                 txn.commit()
0392                 finished = True
0393                 if verbose:
0394                     print "%s: deleted records %s" % (name, recs)
0395             except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
0396                 if verbose:
0397                     print "%s: Aborting transaction (%s)" % (name, val[1])
0398                 txn.abort()
0399                 time.sleep(0.05)
0400 
0401         if verbose:
0402             print "%s: thread finished" % name
0403 
0404     def readerThread(self, d, readerNum):
0405         time.sleep(0.01 * readerNum + 0.05)
0406         name = currentThread().getName()
0407 
0408         for loop in range(5):
0409             finished = False
0410             while not finished:
0411                 try:
0412                     txn = self.env.txn_begin(None, self.txnFlag)
0413                     c = d.cursor(txn)
0414                     count = 0
0415                     rec = c.first()
0416                     while rec:
0417                         count += 1
0418                         key, data = rec
0419                         self.assertEqual(self.makeData(key), data)
0420                         rec = c.next()
0421                     if verbose: print "%s: found %d records" % (name, count)
0422                     c.close()
0423                     txn.commit()
0424                     finished = True
0425                 except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
0426                     if verbose:
0427                         print "%s: Aborting transaction (%s)" % (name, val[1])
0428                     c.close()
0429                     txn.abort()
0430                     time.sleep(0.05)
0431 
0432             time.sleep(0.05)
0433 
0434         if verbose:
0435             print "%s: thread finished" % name
0436 
0437     def deadlockThread(self):
0438         self.doLockDetect = True
0439         while self.doLockDetect:
0440             time.sleep(0.5)
0441             try:
0442                 aborted = self.env.lock_detect(
0443                     db.DB_LOCK_RANDOM, db.DB_LOCK_CONFLICT)
0444                 if verbose and aborted:
0445                     print "deadlock: Aborted %d deadlocked transaction(s)" \
0446                           % aborted
0447             except db.DBError:
0448                 pass
0449 
0450 
0451 class BTreeThreadedTransactions(ThreadedTransactionsBase):
0452     dbtype = db.DB_BTREE
0453     writers = 3
0454     readers = 5
0455     records = 2000
0456 
0457 class HashThreadedTransactions(ThreadedTransactionsBase):
0458     dbtype = db.DB_HASH
0459     writers = 1
0460     readers = 5
0461     records = 2000
0462 
0463 class BTreeThreadedNoWaitTransactions(ThreadedTransactionsBase):
0464     dbtype = db.DB_BTREE
0465     writers = 3
0466     readers = 5
0467     records = 2000
0468     txnFlag = db.DB_TXN_NOWAIT
0469 
0470 class HashThreadedNoWaitTransactions(ThreadedTransactionsBase):
0471     dbtype = db.DB_HASH
0472     writers = 1
0473     readers = 5
0474     records = 2000
0475     txnFlag = db.DB_TXN_NOWAIT
0476 
0477 
0478 #----------------------------------------------------------------------
0479 
0480 def test_suite():
0481     suite = unittest.TestSuite()
0482 
0483     if have_threads:
0484         suite.addTest(unittest.makeSuite(BTreeConcurrentDataStore))
0485         suite.addTest(unittest.makeSuite(HashConcurrentDataStore))
0486         suite.addTest(unittest.makeSuite(BTreeSimpleThreaded))
0487         suite.addTest(unittest.makeSuite(HashSimpleThreaded))
0488         suite.addTest(unittest.makeSuite(BTreeThreadedTransactions))
0489         suite.addTest(unittest.makeSuite(HashThreadedTransactions))
0490         suite.addTest(unittest.makeSuite(BTreeThreadedNoWaitTransactions))
0491         suite.addTest(unittest.makeSuite(HashThreadedNoWaitTransactions))
0492 
0493     else:
0494         print "Threads not available, skipping thread tests."
0495 
0496     return suite
0497 
0498 
0499 if __name__ == '__main__':
0500     unittest.main(defaultTest='test_suite')
0501 

Generated by PyXR 0.9.4
SourceForge.net Logo