0001 import unittest 0002 from test import test_support 0003 import zlib 0004 import random 0005 0006 # print test_support.TESTFN 0007 0008 def getbuf(): 0009 # This was in the original. Avoid non-repeatable sources. 0010 # Left here (unused) in case something wants to be done with it. 0011 import imp 0012 try: 0013 t = imp.find_module('test_zlib') 0014 file = t[0] 0015 except ImportError: 0016 file = open(__file__) 0017 buf = file.read() * 8 0018 file.close() 0019 return buf 0020 0021 0022 0023 class ChecksumTestCase(unittest.TestCase): 0024 # checksum test cases 0025 def test_crc32start(self): 0026 self.assertEqual(zlib.crc32(""), zlib.crc32("", 0)) 0027 0028 def test_crc32empty(self): 0029 self.assertEqual(zlib.crc32("", 0), 0) 0030 self.assertEqual(zlib.crc32("", 1), 1) 0031 self.assertEqual(zlib.crc32("", 432), 432) 0032 0033 def test_adler32start(self): 0034 self.assertEqual(zlib.adler32(""), zlib.adler32("", 1)) 0035 0036 def test_adler32empty(self): 0037 self.assertEqual(zlib.adler32("", 0), 0) 0038 self.assertEqual(zlib.adler32("", 1), 1) 0039 self.assertEqual(zlib.adler32("", 432), 432) 0040 0041 def assertEqual32(self, seen, expected): 0042 # 32-bit values masked -- checksums on 32- vs 64- bit machines 0043 # This is important if bit 31 (0x08000000L) is set. 0044 self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL) 0045 0046 def test_penguins(self): 0047 self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L) 0048 self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94) 0049 self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6) 0050 self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7) 0051 0052 self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0)) 0053 self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1)) 0054 0055 0056 0057 class ExceptionTestCase(unittest.TestCase): 0058 # make sure we generate some expected errors 0059 def test_bigbits(self): 0060 # specifying total bits too large causes an error 0061 self.assertRaises(zlib.error, 0062 zlib.compress, 'ERROR', zlib.MAX_WBITS + 1) 0063 0064 def test_badcompressobj(self): 0065 # verify failure on building compress object with bad params 0066 self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0) 0067 0068 def test_baddecompressobj(self): 0069 # verify failure on building decompress object with bad params 0070 self.assertRaises(ValueError, zlib.decompressobj, 0) 0071 0072 0073 0074 class CompressTestCase(unittest.TestCase): 0075 # Test compression in one go (whole message compression) 0076 def test_speech(self): 0077 x = zlib.compress(HAMLET_SCENE) 0078 self.assertEqual(zlib.decompress(x), HAMLET_SCENE) 0079 0080 def test_speech128(self): 0081 # compress more data 0082 data = HAMLET_SCENE * 128 0083 x = zlib.compress(data) 0084 self.assertEqual(zlib.decompress(x), data) 0085 0086 0087 0088 0089 class CompressObjectTestCase(unittest.TestCase): 0090 # Test compression object 0091 def test_pair(self): 0092 # straightforward compress/decompress objects 0093 data = HAMLET_SCENE * 128 0094 co = zlib.compressobj() 0095 x1 = co.compress(data) 0096 x2 = co.flush() 0097 self.assertRaises(zlib.error, co.flush) # second flush should not work 0098 dco = zlib.decompressobj() 0099 y1 = dco.decompress(x1 + x2) 0100 y2 = dco.flush() 0101 self.assertEqual(data, y1 + y2) 0102 0103 def test_compressoptions(self): 0104 # specify lots of options to compressobj() 0105 level = 2 0106 method = zlib.DEFLATED 0107 wbits = -12 0108 memlevel = 9 0109 strategy = zlib.Z_FILTERED 0110 co = zlib.compressobj(level, method, wbits, memlevel, strategy) 0111 x1 = co.compress(HAMLET_SCENE) 0112 x2 = co.flush() 0113 dco = zlib.decompressobj(wbits) 0114 y1 = dco.decompress(x1 + x2) 0115 y2 = dco.flush() 0116 self.assertEqual(HAMLET_SCENE, y1 + y2) 0117 0118 def test_compressincremental(self): 0119 # compress object in steps, decompress object as one-shot 0120 data = HAMLET_SCENE * 128 0121 co = zlib.compressobj() 0122 bufs = [] 0123 for i in range(0, len(data), 256): 0124 bufs.append(co.compress(data[i:i+256])) 0125 bufs.append(co.flush()) 0126 combuf = ''.join(bufs) 0127 0128 dco = zlib.decompressobj() 0129 y1 = dco.decompress(''.join(bufs)) 0130 y2 = dco.flush() 0131 self.assertEqual(data, y1 + y2) 0132 0133 def test_decompinc(self, flush=False, source=None, cx=256, dcx=64): 0134 # compress object in steps, decompress object in steps 0135 source = source or HAMLET_SCENE 0136 data = source * 128 0137 co = zlib.compressobj() 0138 bufs = [] 0139 for i in range(0, len(data), cx): 0140 bufs.append(co.compress(data[i:i+cx])) 0141 bufs.append(co.flush()) 0142 combuf = ''.join(bufs) 0143 0144 self.assertEqual(data, zlib.decompress(combuf)) 0145 0146 dco = zlib.decompressobj() 0147 bufs = [] 0148 for i in range(0, len(combuf), dcx): 0149 bufs.append(dco.decompress(combuf[i:i+dcx])) 0150 self.assertEqual('', dco.unconsumed_tail, ######## 0151 "(A) uct should be '': not %d long" % 0152 len(dco.unconsumed_tail)) 0153 if flush: 0154 bufs.append(dco.flush()) 0155 else: 0156 while True: 0157 chunk = dco.decompress('') 0158 if chunk: 0159 bufs.append(chunk) 0160 else: 0161 break 0162 self.assertEqual('', dco.unconsumed_tail, ######## 0163 "(B) uct should be '': not %d long" % 0164 len(dco.unconsumed_tail)) 0165 self.assertEqual(data, ''.join(bufs)) 0166 # Failure means: "decompressobj with init options failed" 0167 0168 def test_decompincflush(self): 0169 self.test_decompinc(flush=True) 0170 0171 def test_decompimax(self, source=None, cx=256, dcx=64): 0172 # compress in steps, decompress in length-restricted steps 0173 source = source or HAMLET_SCENE 0174 # Check a decompression object with max_length specified 0175 data = source * 128 0176 co = zlib.compressobj() 0177 bufs = [] 0178 for i in range(0, len(data), cx): 0179 bufs.append(co.compress(data[i:i+cx])) 0180 bufs.append(co.flush()) 0181 combuf = ''.join(bufs) 0182 self.assertEqual(data, zlib.decompress(combuf), 0183 'compressed data failure') 0184 0185 dco = zlib.decompressobj() 0186 bufs = [] 0187 cb = combuf 0188 while cb: 0189 #max_length = 1 + len(cb)//10 0190 chunk = dco.decompress(cb, dcx) 0191 self.failIf(len(chunk) > dcx, 0192 'chunk too big (%d>%d)' % (len(chunk), dcx)) 0193 bufs.append(chunk) 0194 cb = dco.unconsumed_tail 0195 bufs.append(dco.flush()) 0196 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') 0197 0198 def test_decompressmaxlen(self, flush=False): 0199 # Check a decompression object with max_length specified 0200 data = HAMLET_SCENE * 128 0201 co = zlib.compressobj() 0202 bufs = [] 0203 for i in range(0, len(data), 256): 0204 bufs.append(co.compress(data[i:i+256])) 0205 bufs.append(co.flush()) 0206 combuf = ''.join(bufs) 0207 self.assertEqual(data, zlib.decompress(combuf), 0208 'compressed data failure') 0209 0210 dco = zlib.decompressobj() 0211 bufs = [] 0212 cb = combuf 0213 while cb: 0214 max_length = 1 + len(cb)//10 0215 chunk = dco.decompress(cb, max_length) 0216 self.failIf(len(chunk) > max_length, 0217 'chunk too big (%d>%d)' % (len(chunk),max_length)) 0218 bufs.append(chunk) 0219 cb = dco.unconsumed_tail 0220 if flush: 0221 bufs.append(dco.flush()) 0222 else: 0223 while chunk: 0224 chunk = dco.decompress('', max_length) 0225 self.failIf(len(chunk) > max_length, 0226 'chunk too big (%d>%d)' % (len(chunk),max_length)) 0227 bufs.append(chunk) 0228 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') 0229 0230 def test_decompressmaxlenflush(self): 0231 self.test_decompressmaxlen(flush=True) 0232 0233 def test_maxlenmisc(self): 0234 # Misc tests of max_length 0235 dco = zlib.decompressobj() 0236 self.assertRaises(ValueError, dco.decompress, "", -1) 0237 self.assertEqual('', dco.unconsumed_tail) 0238 0239 def test_flushes(self): 0240 # Test flush() with the various options, using all the 0241 # different levels in order to provide more variations. 0242 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH'] 0243 sync_opt = [getattr(zlib, opt) for opt in sync_opt 0244 if hasattr(zlib, opt)] 0245 data = HAMLET_SCENE * 8 0246 0247 for sync in sync_opt: 0248 for level in range(10): 0249 obj = zlib.compressobj( level ) 0250 a = obj.compress( data[:3000] ) 0251 b = obj.flush( sync ) 0252 c = obj.compress( data[3000:] ) 0253 d = obj.flush() 0254 self.assertEqual(zlib.decompress(''.join([a,b,c,d])), 0255 data, ("Decompress failed: flush " 0256 "mode=%i, level=%i") % (sync, level)) 0257 del obj 0258 0259 def test_odd_flush(self): 0260 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 0261 import random 0262 0263 if hasattr(zlib, 'Z_SYNC_FLUSH'): 0264 # Testing on 17K of "random" data 0265 0266 # Create compressor and decompressor objects 0267 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 0268 dco = zlib.decompressobj() 0269 0270 # Try 17K of data 0271 # generate random data stream 0272 try: 0273 # In 2.3 and later, WichmannHill is the RNG of the bug report 0274 gen = random.WichmannHill() 0275 except AttributeError: 0276 try: 0277 # 2.2 called it Random 0278 gen = random.Random() 0279 except AttributeError: 0280 # others might simply have a single RNG 0281 gen = random 0282 gen.seed(1) 0283 data = genblock(1, 17 * 1024, generator=gen) 0284 0285 # compress, sync-flush, and decompress 0286 first = co.compress(data) 0287 second = co.flush(zlib.Z_SYNC_FLUSH) 0288 expanded = dco.decompress(first + second) 0289 0290 # if decompressed data is different from the input data, choke. 0291 self.assertEqual(expanded, data, "17K random source doesn't match") 0292 0293 0294 def genblock(seed, length, step=1024, generator=random): 0295 """length-byte stream of random data from a seed (in step-byte blocks).""" 0296 if seed is not None: 0297 generator.seed(seed) 0298 randint = generator.randint 0299 if length < step or step < 2: 0300 step = length 0301 blocks = [] 0302 for i in range(0, length, step): 0303 blocks.append(''.join([chr(randint(0,255)) 0304 for x in range(step)])) 0305 return ''.join(blocks)[:length] 0306 0307 0308 0309 def choose_lines(source, number, seed=None, generator=random): 0310 """Return a list of number lines randomly chosen from the source""" 0311 if seed is not None: 0312 generator.seed(seed) 0313 sources = source.split('\n') 0314 return [generator.choice(sources) for n in range(number)] 0315 0316 0317 0318 HAMLET_SCENE = """ 0319 LAERTES 0320 0321 O, fear me not. 0322 I stay too long: but here my father comes. 0323 0324 Enter POLONIUS 0325 0326 A double blessing is a double grace, 0327 Occasion smiles upon a second leave. 0328 0329 LORD POLONIUS 0330 0331 Yet here, Laertes! aboard, aboard, for shame! 0332 The wind sits in the shoulder of your sail, 0333 And you are stay'd for. There; my blessing with thee! 0334 And these few precepts in thy memory 0335 See thou character. Give thy thoughts no tongue, 0336 Nor any unproportioned thought his act. 0337 Be thou familiar, but by no means vulgar. 0338 Those friends thou hast, and their adoption tried, 0339 Grapple them to thy soul with hoops of steel; 0340 But do not dull thy palm with entertainment 0341 Of each new-hatch'd, unfledged comrade. Beware 0342 Of entrance to a quarrel, but being in, 0343 Bear't that the opposed may beware of thee. 0344 Give every man thy ear, but few thy voice; 0345 Take each man's censure, but reserve thy judgment. 0346 Costly thy habit as thy purse can buy, 0347 But not express'd in fancy; rich, not gaudy; 0348 For the apparel oft proclaims the man, 0349 And they in France of the best rank and station 0350 Are of a most select and generous chief in that. 0351 Neither a borrower nor a lender be; 0352 For loan oft loses both itself and friend, 0353 And borrowing dulls the edge of husbandry. 0354 This above all: to thine ownself be true, 0355 And it must follow, as the night the day, 0356 Thou canst not then be false to any man. 0357 Farewell: my blessing season this in thee! 0358 0359 LAERTES 0360 0361 Most humbly do I take my leave, my lord. 0362 0363 LORD POLONIUS 0364 0365 The time invites you; go; your servants tend. 0366 0367 LAERTES 0368 0369 Farewell, Ophelia; and remember well 0370 What I have said to you. 0371 0372 OPHELIA 0373 0374 'Tis in my memory lock'd, 0375 And you yourself shall keep the key of it. 0376 0377 LAERTES 0378 0379 Farewell. 0380 """ 0381 0382 0383 def test_main(): 0384 test_support.run_unittest( 0385 ChecksumTestCase, 0386 ExceptionTestCase, 0387 CompressTestCase, 0388 CompressObjectTestCase 0389 ) 0390 0391 if __name__ == "__main__": 0392 test_main() 0393 0394 def test(tests=''): 0395 if not tests: tests = 'o' 0396 testcases = [] 0397 if 'k' in tests: testcases.append(ChecksumTestCase) 0398 if 'x' in tests: testcases.append(ExceptionTestCase) 0399 if 'c' in tests: testcases.append(CompressTestCase) 0400 if 'o' in tests: testcases.append(CompressObjectTestCase) 0401 test_support.run_unittest(*testcases) 0402 0403 if False: 0404 import sys 0405 sys.path.insert(1, '/Py23Src/python/dist/src/Lib/test') 0406 import test_zlib as tz 0407 ts, ut = tz.test_support, tz.unittest 0408 su = ut.TestSuite() 0409 su.addTest(ut.makeSuite(tz.CompressTestCase)) 0410 ts.run_suite(su) 0411
Generated by PyXR 0.9.4