0001 """ 0002 TestCases for exercising a Queue DB. 0003 """ 0004 0005 import sys, os, string 0006 import tempfile 0007 from pprint import pprint 0008 import unittest 0009 0010 try: 0011 # For Pythons w/distutils pybsddb 0012 from bsddb3 import db 0013 except ImportError: 0014 # For Python 2.3 0015 from bsddb import db 0016 0017 from test_all import verbose 0018 0019 0020 #---------------------------------------------------------------------- 0021 0022 class SimpleQueueTestCase(unittest.TestCase): 0023 def setUp(self): 0024 self.filename = tempfile.mktemp() 0025 0026 def tearDown(self): 0027 try: 0028 os.remove(self.filename) 0029 except os.error: 0030 pass 0031 0032 0033 def test01_basic(self): 0034 # Basic Queue tests using the deprecated DBCursor.consume method. 0035 0036 if verbose: 0037 print '\n', '-=' * 30 0038 print "Running %s.test01_basic..." % self.__class__.__name__ 0039 0040 d = db.DB() 0041 d.set_re_len(40) # Queues must be fixed length 0042 d.open(self.filename, db.DB_QUEUE, db.DB_CREATE) 0043 0044 if verbose: 0045 print "before appends" + '-' * 30 0046 pprint(d.stat()) 0047 0048 for x in string.letters: 0049 d.append(x * 40) 0050 0051 assert len(d) == 52 0052 0053 d.put(100, "some more data") 0054 d.put(101, "and some more ") 0055 d.put(75, "out of order") 0056 d.put(1, "replacement data") 0057 0058 assert len(d) == 55 0059 0060 if verbose: 0061 print "before close" + '-' * 30 0062 pprint(d.stat()) 0063 0064 d.close() 0065 del d 0066 d = db.DB() 0067 d.open(self.filename) 0068 0069 if verbose: 0070 print "after open" + '-' * 30 0071 pprint(d.stat()) 0072 0073 d.append("one more") 0074 c = d.cursor() 0075 0076 if verbose: 0077 print "after append" + '-' * 30 0078 pprint(d.stat()) 0079 0080 rec = c.consume() 0081 while rec: 0082 if verbose: 0083 print rec 0084 rec = c.consume() 0085 c.close() 0086 0087 if verbose: 0088 print "after consume loop" + '-' * 30 0089 pprint(d.stat()) 0090 0091 assert len(d) == 0, \ 0092 "if you see this message then you need to rebuild " \ 0093 "BerkeleyDB 3.1.17 with the patch in patches/qam_stat.diff" 0094 0095 d.close() 0096 0097 0098 0099 def test02_basicPost32(self): 0100 # Basic Queue tests using the new DB.consume method in DB 3.2+ 0101 # (No cursor needed) 0102 0103 if verbose: 0104 print '\n', '-=' * 30 0105 print "Running %s.test02_basicPost32..." % self.__class__.__name__ 0106 0107 if db.version() < (3, 2, 0): 0108 if verbose: 0109 print "Test not run, DB not new enough..." 0110 return 0111 0112 d = db.DB() 0113 d.set_re_len(40) # Queues must be fixed length 0114 d.open(self.filename, db.DB_QUEUE, db.DB_CREATE) 0115 0116 if verbose: 0117 print "before appends" + '-' * 30 0118 pprint(d.stat()) 0119 0120 for x in string.letters: 0121 d.append(x * 40) 0122 0123 assert len(d) == 52 0124 0125 d.put(100, "some more data") 0126 d.put(101, "and some more ") 0127 d.put(75, "out of order") 0128 d.put(1, "replacement data") 0129 0130 assert len(d) == 55 0131 0132 if verbose: 0133 print "before close" + '-' * 30 0134 pprint(d.stat()) 0135 0136 d.close() 0137 del d 0138 d = db.DB() 0139 d.open(self.filename) 0140 #d.set_get_returns_none(true) 0141 0142 if verbose: 0143 print "after open" + '-' * 30 0144 pprint(d.stat()) 0145 0146 d.append("one more") 0147 0148 if verbose: 0149 print "after append" + '-' * 30 0150 pprint(d.stat()) 0151 0152 rec = d.consume() 0153 while rec: 0154 if verbose: 0155 print rec 0156 rec = d.consume() 0157 0158 if verbose: 0159 print "after consume loop" + '-' * 30 0160 pprint(d.stat()) 0161 0162 d.close() 0163 0164 0165 0166 #---------------------------------------------------------------------- 0167 0168 def test_suite(): 0169 return unittest.makeSuite(SimpleQueueTestCase) 0170 0171 0172 if __name__ == '__main__': 0173 unittest.main(defaultTest='test_suite') 0174
Generated by PyXR 0.9.4