0001 """TestCases for exercising a Recno DB. 0002 """ 0003 0004 import os 0005 import sys 0006 import errno 0007 import tempfile 0008 from pprint import pprint 0009 import unittest 0010 0011 from test_all import verbose 0012 0013 try: 0014 # For Pythons w/distutils pybsddb 0015 from bsddb3 import db 0016 except ImportError: 0017 # For Python 2.3 0018 from bsddb import db 0019 0020 letters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ' 0021 0022 0023 #---------------------------------------------------------------------- 0024 0025 class SimpleRecnoTestCase(unittest.TestCase): 0026 def setUp(self): 0027 self.filename = tempfile.mktemp() 0028 0029 def tearDown(self): 0030 try: 0031 os.remove(self.filename) 0032 except OSError, e: 0033 if e.errno <> errno.EEXIST: raise 0034 0035 def test01_basic(self): 0036 d = db.DB() 0037 d.open(self.filename, db.DB_RECNO, db.DB_CREATE) 0038 0039 for x in letters: 0040 recno = d.append(x * 60) 0041 assert type(recno) == type(0) 0042 assert recno >= 1 0043 if verbose: 0044 print recno, 0045 0046 if verbose: print 0047 0048 stat = d.stat() 0049 if verbose: 0050 pprint(stat) 0051 0052 for recno in range(1, len(d)+1): 0053 data = d[recno] 0054 if verbose: 0055 print data 0056 0057 assert type(data) == type("") 0058 assert data == d.get(recno) 0059 0060 try: 0061 data = d[0] # This should raise a KeyError!?!?! 0062 except db.DBInvalidArgError, val: 0063 assert val[0] == db.EINVAL 0064 if verbose: print val 0065 else: 0066 self.fail("expected exception") 0067 0068 try: 0069 data = d[100] 0070 except KeyError: 0071 pass 0072 else: 0073 self.fail("expected exception") 0074 0075 data = d.get(100) 0076 assert data == None 0077 0078 keys = d.keys() 0079 if verbose: 0080 print keys 0081 assert type(keys) == type([]) 0082 assert type(keys[0]) == type(123) 0083 assert len(keys) == len(d) 0084 0085 items = d.items() 0086 if verbose: 0087 pprint(items) 0088 assert type(items) == type([]) 0089 assert type(items[0]) == type(()) 0090 assert len(items[0]) == 2 0091 assert type(items[0][0]) == type(123) 0092 assert type(items[0][1]) == type("") 0093 assert len(items) == len(d) 0094 0095 assert d.has_key(25) 0096 0097 del d[25] 0098 assert not d.has_key(25) 0099 0100 d.delete(13) 0101 assert not d.has_key(13) 0102 0103 data = d.get_both(26, "z" * 60) 0104 assert data == "z" * 60 0105 if verbose: 0106 print data 0107 0108 fd = d.fd() 0109 if verbose: 0110 print fd 0111 0112 c = d.cursor() 0113 rec = c.first() 0114 while rec: 0115 if verbose: 0116 print rec 0117 rec = c.next() 0118 0119 c.set(50) 0120 rec = c.current() 0121 if verbose: 0122 print rec 0123 0124 c.put(-1, "a replacement record", db.DB_CURRENT) 0125 0126 c.set(50) 0127 rec = c.current() 0128 assert rec == (50, "a replacement record") 0129 if verbose: 0130 print rec 0131 0132 rec = c.set_range(30) 0133 if verbose: 0134 print rec 0135 0136 # test that non-existant key lookups work (and that 0137 # DBC_set_range doesn't have a memleak under valgrind) 0138 rec = c.set_range(999999) 0139 assert rec == None 0140 if verbose: 0141 print rec 0142 0143 c.close() 0144 d.close() 0145 0146 d = db.DB() 0147 d.open(self.filename) 0148 c = d.cursor() 0149 0150 # put a record beyond the consecutive end of the recno's 0151 d[100] = "way out there" 0152 assert d[100] == "way out there" 0153 0154 try: 0155 data = d[99] 0156 except KeyError: 0157 pass 0158 else: 0159 self.fail("expected exception") 0160 0161 try: 0162 d.get(99) 0163 except db.DBKeyEmptyError, val: 0164 assert val[0] == db.DB_KEYEMPTY 0165 if verbose: print val 0166 else: 0167 self.fail("expected exception") 0168 0169 rec = c.set(40) 0170 while rec: 0171 if verbose: 0172 print rec 0173 rec = c.next() 0174 0175 c.close() 0176 d.close() 0177 0178 def test02_WithSource(self): 0179 """ 0180 A Recno file that is given a "backing source file" is essentially a 0181 simple ASCII file. Normally each record is delimited by \n and so is 0182 just a line in the file, but you can set a different record delimiter 0183 if needed. 0184 """ 0185 source = os.path.join(os.path.dirname(sys.argv[0]), 0186 'db_home/test_recno.txt') 0187 if not os.path.isdir('db_home'): 0188 os.mkdir('db_home') 0189 f = open(source, 'w') # create the file 0190 f.close() 0191 0192 d = db.DB() 0193 # This is the default value, just checking if both int 0194 d.set_re_delim(0x0A) 0195 d.set_re_delim('\n') # and char can be used... 0196 d.set_re_source(source) 0197 d.open(self.filename, db.DB_RECNO, db.DB_CREATE) 0198 0199 data = "The quick brown fox jumped over the lazy dog".split() 0200 for datum in data: 0201 d.append(datum) 0202 d.sync() 0203 d.close() 0204 0205 # get the text from the backing source 0206 text = open(source, 'r').read() 0207 text = text.strip() 0208 if verbose: 0209 print text 0210 print data 0211 print text.split('\n') 0212 0213 assert text.split('\n') == data 0214 0215 # open as a DB again 0216 d = db.DB() 0217 d.set_re_source(source) 0218 d.open(self.filename, db.DB_RECNO) 0219 0220 d[3] = 'reddish-brown' 0221 d[8] = 'comatose' 0222 0223 d.sync() 0224 d.close() 0225 0226 text = open(source, 'r').read() 0227 text = text.strip() 0228 if verbose: 0229 print text 0230 print text.split('\n') 0231 0232 assert text.split('\n') == \ 0233 "The quick reddish-brown fox jumped over the comatose dog".split() 0234 0235 def test03_FixedLength(self): 0236 d = db.DB() 0237 d.set_re_len(40) # fixed length records, 40 bytes long 0238 d.set_re_pad('-') # sets the pad character... 0239 d.set_re_pad(45) # ...test both int and char 0240 d.open(self.filename, db.DB_RECNO, db.DB_CREATE) 0241 0242 for x in letters: 0243 d.append(x * 35) # These will be padded 0244 0245 d.append('.' * 40) # this one will be exact 0246 0247 try: # this one will fail 0248 d.append('bad' * 20) 0249 except db.DBInvalidArgError, val: 0250 assert val[0] == db.EINVAL 0251 if verbose: print val 0252 else: 0253 self.fail("expected exception") 0254 0255 c = d.cursor() 0256 rec = c.first() 0257 while rec: 0258 if verbose: 0259 print rec 0260 rec = c.next() 0261 0262 c.close() 0263 d.close() 0264 0265 0266 #---------------------------------------------------------------------- 0267 0268 0269 def test_suite(): 0270 return unittest.makeSuite(SimpleRecnoTestCase) 0271 0272 0273 if __name__ == '__main__': 0274 unittest.main(defaultTest='test_suite') 0275
Generated by PyXR 0.9.4