0001 """ 0002 TestCases for DB.associate. 0003 """ 0004 0005 import sys, os, string 0006 import tempfile 0007 import time 0008 from pprint import pprint 0009 0010 try: 0011 from threading import Thread, currentThread 0012 have_threads = 1 0013 except ImportError: 0014 have_threads = 0 0015 0016 import unittest 0017 from test_all import verbose 0018 0019 try: 0020 # For Pythons w/distutils pybsddb 0021 from bsddb3 import db, dbshelve 0022 except ImportError: 0023 # For Python 2.3 0024 from bsddb import db, dbshelve 0025 0026 0027 #---------------------------------------------------------------------- 0028 0029 0030 musicdata = { 0031 1 : ("Bad English", "The Price Of Love", "Rock"), 0032 2 : ("DNA featuring Suzanne Vega", "Tom's Diner", "Rock"), 0033 3 : ("George Michael", "Praying For Time", "Rock"), 0034 4 : ("Gloria Estefan", "Here We Are", "Rock"), 0035 5 : ("Linda Ronstadt", "Don't Know Much", "Rock"), 0036 6 : ("Michael Bolton", "How Am I Supposed To Live Without You", "Blues"), 0037 7 : ("Paul Young", "Oh Girl", "Rock"), 0038 8 : ("Paula Abdul", "Opposites Attract", "Rock"), 0039 9 : ("Richard Marx", "Should've Known Better", "Rock"), 0040 10: ("Rod Stewart", "Forever Young", "Rock"), 0041 11: ("Roxette", "Dangerous", "Rock"), 0042 12: ("Sheena Easton", "The Lover In Me", "Rock"), 0043 13: ("Sinead O'Connor", "Nothing Compares 2 U", "Rock"), 0044 14: ("Stevie B.", "Because I Love You", "Rock"), 0045 15: ("Taylor Dayne", "Love Will Lead You Back", "Rock"), 0046 16: ("The Bangles", "Eternal Flame", "Rock"), 0047 17: ("Wilson Phillips", "Release Me", "Rock"), 0048 18: ("Billy Joel", "Blonde Over Blue", "Rock"), 0049 19: ("Billy Joel", "Famous Last Words", "Rock"), 0050 20: ("Billy Joel", "Lullabye (Goodnight, My Angel)", "Rock"), 0051 21: ("Billy Joel", "The River Of Dreams", "Rock"), 0052 22: ("Billy Joel", "Two Thousand Years", "Rock"), 0053 23: ("Janet Jackson", "Alright", "Rock"), 0054 24: ("Janet Jackson", "Black Cat", "Rock"), 0055 25: ("Janet Jackson", "Come Back To Me", "Rock"), 0056 26: ("Janet Jackson", "Escapade", "Rock"), 0057 27: ("Janet Jackson", "Love Will Never Do (Without You)", "Rock"), 0058 28: ("Janet Jackson", "Miss You Much", "Rock"), 0059 29: ("Janet Jackson", "Rhythm Nation", "Rock"), 0060 30: ("Janet Jackson", "State Of The World", "Rock"), 0061 31: ("Janet Jackson", "The Knowledge", "Rock"), 0062 32: ("Spyro Gyra", "End of Romanticism", "Jazz"), 0063 33: ("Spyro Gyra", "Heliopolis", "Jazz"), 0064 34: ("Spyro Gyra", "Jubilee", "Jazz"), 0065 35: ("Spyro Gyra", "Little Linda", "Jazz"), 0066 36: ("Spyro Gyra", "Morning Dance", "Jazz"), 0067 37: ("Spyro Gyra", "Song for Lorraine", "Jazz"), 0068 38: ("Yes", "Owner Of A Lonely Heart", "Rock"), 0069 39: ("Yes", "Rhythm Of Love", "Rock"), 0070 40: ("Cusco", "Dream Catcher", "New Age"), 0071 41: ("Cusco", "Geronimos Laughter", "New Age"), 0072 42: ("Cusco", "Ghost Dance", "New Age"), 0073 43: ("Blue Man Group", "Drumbone", "New Age"), 0074 44: ("Blue Man Group", "Endless Column", "New Age"), 0075 45: ("Blue Man Group", "Klein Mandelbrot", "New Age"), 0076 46: ("Kenny G", "Silhouette", "Jazz"), 0077 47: ("Sade", "Smooth Operator", "Jazz"), 0078 48: ("David Arkenstone", "Papillon (On The Wings Of The Butterfly)", 0079 "New Age"), 0080 49: ("David Arkenstone", "Stepping Stars", "New Age"), 0081 50: ("David Arkenstone", "Carnation Lily Lily Rose", "New Age"), 0082 51: ("David Lanz", "Behind The Waterfall", "New Age"), 0083 52: ("David Lanz", "Cristofori's Dream", "New Age"), 0084 53: ("David Lanz", "Heartsounds", "New Age"), 0085 54: ("David Lanz", "Leaves on the Seine", "New Age"), 0086 99: ("unknown artist", "Unnamed song", "Unknown"), 0087 } 0088 0089 #---------------------------------------------------------------------- 0090 0091 0092 class AssociateTestCase(unittest.TestCase): 0093 keytype = '' 0094 0095 def setUp(self): 0096 self.filename = self.__class__.__name__ + '.db' 0097 homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home') 0098 self.homeDir = homeDir 0099 try: os.mkdir(homeDir) 0100 except os.error: pass 0101 self.env = db.DBEnv() 0102 self.env.open(homeDir, db.DB_CREATE | db.DB_INIT_MPOOL | 0103 db.DB_INIT_LOCK | db.DB_THREAD) 0104 0105 def tearDown(self): 0106 self.closeDB() 0107 self.env.close() 0108 import glob 0109 files = glob.glob(os.path.join(self.homeDir, '*')) 0110 for file in files: 0111 os.remove(file) 0112 0113 def addDataToDB(self, d): 0114 for key, value in musicdata.items(): 0115 if type(self.keytype) == type(''): 0116 key = "%02d" % key 0117 d.put(key, string.join(value, '|')) 0118 0119 def createDB(self): 0120 self.primary = db.DB(self.env) 0121 self.primary.set_get_returns_none(2) 0122 self.primary.open(self.filename, "primary", self.dbtype, 0123 db.DB_CREATE | db.DB_THREAD) 0124 0125 def closeDB(self): 0126 self.primary.close() 0127 0128 def getDB(self): 0129 return self.primary 0130 0131 def test01_associateWithDB(self): 0132 if verbose: 0133 print '\n', '-=' * 30 0134 print "Running %s.test01_associateWithDB..." % \ 0135 self.__class__.__name__ 0136 0137 self.createDB() 0138 0139 secDB = db.DB(self.env) 0140 secDB.set_flags(db.DB_DUP) 0141 secDB.set_get_returns_none(2) 0142 secDB.open(self.filename, "secondary", db.DB_BTREE, 0143 db.DB_CREATE | db.DB_THREAD) 0144 self.getDB().associate(secDB, self.getGenre) 0145 0146 self.addDataToDB(self.getDB()) 0147 0148 self.finish_test(secDB) 0149 0150 0151 def test02_associateAfterDB(self): 0152 if verbose: 0153 print '\n', '-=' * 30 0154 print "Running %s.test02_associateAfterDB..." % \ 0155 self.__class__.__name__ 0156 0157 self.createDB() 0158 self.addDataToDB(self.getDB()) 0159 0160 secDB = db.DB(self.env) 0161 secDB.set_flags(db.DB_DUP) 0162 secDB.open(self.filename, "secondary", db.DB_BTREE, 0163 db.DB_CREATE | db.DB_THREAD) 0164 0165 # adding the DB_CREATE flag will cause it to index existing records 0166 self.getDB().associate(secDB, self.getGenre, db.DB_CREATE) 0167 0168 self.finish_test(secDB) 0169 0170 0171 def finish_test(self, secDB): 0172 # 'Blues' should not be in the secondary database 0173 vals = secDB.pget('Blues') 0174 assert vals == None, vals 0175 0176 vals = secDB.pget('Unknown') 0177 assert vals[0] == 99 or vals[0] == '99', vals 0178 vals[1].index('Unknown') 0179 vals[1].index('Unnamed') 0180 vals[1].index('unknown') 0181 0182 if verbose: 0183 print "Primary key traversal:" 0184 c = self.getDB().cursor() 0185 count = 0 0186 rec = c.first() 0187 while rec is not None: 0188 if type(self.keytype) == type(''): 0189 assert string.atoi(rec[0]) # for primary db, key is a number 0190 else: 0191 assert rec[0] and type(rec[0]) == type(0) 0192 count = count + 1 0193 if verbose: 0194 print rec 0195 rec = c.next() 0196 assert count == len(musicdata) # all items accounted for 0197 0198 0199 if verbose: 0200 print "Secondary key traversal:" 0201 c = secDB.cursor() 0202 count = 0 0203 0204 # test cursor pget 0205 vals = c.pget('Unknown', flags=db.DB_LAST) 0206 assert vals[1] == 99 or vals[1] == '99', vals 0207 assert vals[0] == 'Unknown' 0208 vals[2].index('Unknown') 0209 vals[2].index('Unnamed') 0210 vals[2].index('unknown') 0211 0212 vals = c.pget('Unknown', data='wrong value', flags=db.DB_GET_BOTH) 0213 assert vals == None, vals 0214 0215 rec = c.first() 0216 assert rec[0] == "Jazz" 0217 while rec is not None: 0218 count = count + 1 0219 if verbose: 0220 print rec 0221 rec = c.next() 0222 # all items accounted for EXCEPT for 1 with "Blues" genre 0223 assert count == len(musicdata)-1 0224 0225 def getGenre(self, priKey, priData): 0226 assert type(priData) == type("") 0227 if verbose: 0228 print 'getGenre key: %r data: %r' % (priKey, priData) 0229 genre = string.split(priData, '|')[2] 0230 if genre == 'Blues': 0231 return db.DB_DONOTINDEX 0232 else: 0233 return genre 0234 0235 0236 #---------------------------------------------------------------------- 0237 0238 0239 class AssociateHashTestCase(AssociateTestCase): 0240 dbtype = db.DB_HASH 0241 0242 class AssociateBTreeTestCase(AssociateTestCase): 0243 dbtype = db.DB_BTREE 0244 0245 class AssociateRecnoTestCase(AssociateTestCase): 0246 dbtype = db.DB_RECNO 0247 keytype = 0 0248 0249 0250 #---------------------------------------------------------------------- 0251 0252 class ShelveAssociateTestCase(AssociateTestCase): 0253 0254 def createDB(self): 0255 self.primary = dbshelve.open(self.filename, 0256 dbname="primary", 0257 dbenv=self.env, 0258 filetype=self.dbtype) 0259 0260 def addDataToDB(self, d): 0261 for key, value in musicdata.items(): 0262 if type(self.keytype) == type(''): 0263 key = "%02d" % key 0264 d.put(key, value) # save the value as is this time 0265 0266 0267 def getGenre(self, priKey, priData): 0268 assert type(priData) == type(()) 0269 if verbose: 0270 print 'getGenre key: %r data: %r' % (priKey, priData) 0271 genre = priData[2] 0272 if genre == 'Blues': 0273 return db.DB_DONOTINDEX 0274 else: 0275 return genre 0276 0277 0278 class ShelveAssociateHashTestCase(ShelveAssociateTestCase): 0279 dbtype = db.DB_HASH 0280 0281 class ShelveAssociateBTreeTestCase(ShelveAssociateTestCase): 0282 dbtype = db.DB_BTREE 0283 0284 class ShelveAssociateRecnoTestCase(ShelveAssociateTestCase): 0285 dbtype = db.DB_RECNO 0286 keytype = 0 0287 0288 0289 #---------------------------------------------------------------------- 0290 0291 class ThreadedAssociateTestCase(AssociateTestCase): 0292 0293 def addDataToDB(self, d): 0294 t1 = Thread(target = self.writer1, 0295 args = (d, )) 0296 t2 = Thread(target = self.writer2, 0297 args = (d, )) 0298 0299 t1.start() 0300 t2.start() 0301 t1.join() 0302 t2.join() 0303 0304 def writer1(self, d): 0305 for key, value in musicdata.items(): 0306 if type(self.keytype) == type(''): 0307 key = "%02d" % key 0308 d.put(key, string.join(value, '|')) 0309 0310 def writer2(self, d): 0311 for x in range(100, 600): 0312 key = 'z%2d' % x 0313 value = [key] * 4 0314 d.put(key, string.join(value, '|')) 0315 0316 0317 class ThreadedAssociateHashTestCase(ShelveAssociateTestCase): 0318 dbtype = db.DB_HASH 0319 0320 class ThreadedAssociateBTreeTestCase(ShelveAssociateTestCase): 0321 dbtype = db.DB_BTREE 0322 0323 class ThreadedAssociateRecnoTestCase(ShelveAssociateTestCase): 0324 dbtype = db.DB_RECNO 0325 keytype = 0 0326 0327 0328 #---------------------------------------------------------------------- 0329 0330 def test_suite(): 0331 suite = unittest.TestSuite() 0332 0333 if db.version() >= (3, 3, 11): 0334 suite.addTest(unittest.makeSuite(AssociateHashTestCase)) 0335 suite.addTest(unittest.makeSuite(AssociateBTreeTestCase)) 0336 suite.addTest(unittest.makeSuite(AssociateRecnoTestCase)) 0337 0338 suite.addTest(unittest.makeSuite(ShelveAssociateHashTestCase)) 0339 suite.addTest(unittest.makeSuite(ShelveAssociateBTreeTestCase)) 0340 suite.addTest(unittest.makeSuite(ShelveAssociateRecnoTestCase)) 0341 0342 if have_threads: 0343 suite.addTest(unittest.makeSuite(ThreadedAssociateHashTestCase)) 0344 suite.addTest(unittest.makeSuite(ThreadedAssociateBTreeTestCase)) 0345 suite.addTest(unittest.makeSuite(ThreadedAssociateRecnoTestCase)) 0346 0347 return suite 0348 0349 0350 if __name__ == '__main__': 0351 unittest.main(defaultTest='test_suite') 0352
Generated by PyXR 0.9.4