PyXR

c:\python24\lib \ test \ test_bz2.py



0001 #!/usr/bin/python
0002 from test import test_support
0003 from test.test_support import TESTFN
0004 
0005 import unittest
0006 from cStringIO import StringIO
0007 import os
0008 import popen2
0009 import sys
0010 
0011 import bz2
0012 from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor
0013 
0014 has_cmdline_bunzip2 = sys.platform not in ("win32", "os2emx", "riscos")
0015 
0016 class BaseTest(unittest.TestCase):
0017     "Base for other testcases."
0018     TEXT = 'root:x:0:0:root:/root:/bin/bash\nbin:x:1:1:bin:/bin:\ndaemon:x:2:2:daemon:/sbin:\nadm:x:3:4:adm:/var/adm:\nlp:x:4:7:lp:/var/spool/lpd:\nsync:x:5:0:sync:/sbin:/bin/sync\nshutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\nhalt:x:7:0:halt:/sbin:/sbin/halt\nmail:x:8:12:mail:/var/spool/mail:\nnews:x:9:13:news:/var/spool/news:\nuucp:x:10:14:uucp:/var/spool/uucp:\noperator:x:11:0:operator:/root:\ngames:x:12:100:games:/usr/games:\ngopher:x:13:30:gopher:/usr/lib/gopher-data:\nftp:x:14:50:FTP User:/var/ftp:/bin/bash\nnobody:x:65534:65534:Nobody:/home:\npostfix:x:100:101:postfix:/var/spool/postfix:\nniemeyer:x:500:500::/home/niemeyer:/bin/bash\npostgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\nmysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\nwww:x:103:104::/var/www:/bin/false\n'
0019     DATA = 'BZh91AY&SY.\xc8N\x18\x00\x01>_\x80\x00\x10@\x02\xff\xf0\x01\x07n\x00?\xe7\xff\xe00\x01\x99\xaa\x00\xc0\x03F\x86\x8c#&\x83F\x9a\x03\x06\xa6\xd0\xa6\x93M\x0fQ\xa7\xa8\x06\x804hh\x12$\x11\xa4i4\xf14S\xd2<Q\xb5\x0fH\xd3\xd4\xdd\xd5\x87\xbb\xf8\x94\r\x8f\xafI\x12\xe1\xc9\xf8/E\x00pu\x89\x12]\xc9\xbbDL\nQ\x0e\t1\x12\xdf\xa0\xc0\x97\xac2O9\x89\x13\x94\x0e\x1c7\x0ed\x95I\x0c\xaaJ\xa4\x18L\x10\x05#\x9c\xaf\xba\xbc/\x97\x8a#C\xc8\xe1\x8cW\xf9\xe2\xd0\xd6M\xa7\x8bXa<e\x84t\xcbL\xb3\xa7\xd9\xcd\xd1\xcb\x84.\xaf\xb3\xab\xab\xad`n}\xa0lh\tE,\x8eZ\x15\x17VH>\x88\xe5\xcd9gd6\x0b\n\xe9\x9b\xd5\x8a\x99\xf7\x08.K\x8ev\xfb\xf7xw\xbb\xdf\xa1\x92\xf1\xdd|/";\xa2\xba\x9f\xd5\xb1#A\xb6\xf6\xb3o\xc9\xc5y\\\xebO\xe7\x85\x9a\xbc\xb6f8\x952\xd5\xd7"%\x89>V,\xf7\xa6z\xe2\x9f\xa3\xdf\x11\x11"\xd6E)I\xa9\x13^\xca\xf3r\xd0\x03U\x922\xf26\xec\xb6\xed\x8b\xc3U\x13\x9d\xc5\x170\xa4\xfa^\x92\xacDF\x8a\x97\xd6\x19\xfe\xdd\xb8\xbd\x1a\x9a\x19\xa3\x80ankR\x8b\xe5\xd83]\xa9\xc6\x08\x82f\xf6\xb9"6l$\xb8j@\xc0\x8a\xb0l1..\xbak\x83ls\x15\xbc\xf4\xc1\x13\xbe\xf8E\xb8\x9d\r\xa8\x9dk\x84\xd3n\xfa\xacQ\x07\xb1%y\xaav\xb4\x08\xe0z\x1b\x16\xf5\x04\xe9\xcc\xb9\x08z\x1en7.G\xfc]\xc9\x14\xe1B@\xbb!8`'
0020     DATA_CRLF = 'BZh91AY&SY\xaez\xbbN\x00\x01H\xdf\x80\x00\x12@\x02\xff\xf0\x01\x07n\x00?\xe7\xff\xe0@\x01\xbc\xc6`\x86*\x8d=M\xa9\x9a\x86\xd0L@\x0fI\xa6!\xa1\x13\xc8\x88jdi\x8d@\x03@\x1a\x1a\x0c\x0c\x83 \x00\xc4h2\x19\x01\x82D\x84e\t\xe8\x99\x89\x19\x1ah\x00\r\x1a\x11\xaf\x9b\x0fG\xf5(\x1b\x1f?\t\x12\xcf\xb5\xfc\x95E\x00ps\x89\x12^\xa4\xdd\xa2&\x05(\x87\x04\x98\x89u\xe40%\xb6\x19\'\x8c\xc4\x89\xca\x07\x0e\x1b!\x91UIFU%C\x994!DI\xd2\xfa\xf0\xf1N8W\xde\x13A\xf5\x9cr%?\x9f3;I45A\xd1\x8bT\xb1<l\xba\xcb_\xc00xY\x17r\x17\x88\x08\x08@\xa0\ry@\x10\x04$)`\xf2\xce\x89z\xb0s\xec\x9b.iW\x9d\x81\xb5-+t\x9f\x1a\'\x97dB\xf5x\xb5\xbe.[.\xd7\x0e\x81\xe7\x08\x1cN`\x88\x10\xca\x87\xc3!"\x80\x92R\xa1/\xd1\xc0\xe6mf\xac\xbd\x99\xcca\xb3\x8780>\xa4\xc7\x8d\x1a\\"\xad\xa1\xabyBg\x15\xb9l\x88\x88\x91k"\x94\xa4\xd4\x89\xae*\xa6\x0b\x10\x0c\xd6\xd4m\xe86\xec\xb5j\x8a\x86j\';\xca.\x01I\xf2\xaaJ\xe8\x88\x8cU+t3\xfb\x0c\n\xa33\x13r2\r\x16\xe0\xb3(\xbf\x1d\x83r\xe7M\xf0D\x1365\xd8\x88\xd3\xa4\x92\xcb2\x06\x04\\\xc1\xb0\xea//\xbek&\xd8\xe6+t\xe5\xa1\x13\xada\x16\xder5"w]\xa2i\xb7[\x97R \xe2IT\xcd;Z\x04dk4\xad\x8a\t\xd3\x81z\x10\xf1:^`\xab\x1f\xc5\xdc\x91N\x14$+\x9e\xae\xd3\x80'
0021 
0022     if has_cmdline_bunzip2:
0023         def decompress(self, data):
0024             pop = popen2.Popen3("bunzip2", capturestderr=1)
0025             pop.tochild.write(data)
0026             pop.tochild.close()
0027             ret = pop.fromchild.read()
0028             pop.fromchild.close()
0029             if pop.wait() != 0:
0030                 ret = bz2.decompress(data)
0031             return ret
0032 
0033     else:
0034         # popen2.Popen3 doesn't exist on Windows, and even if it did, bunzip2
0035         # isn't available to run.
0036         def decompress(self, data):
0037             return bz2.decompress(data)
0038 
0039 class BZ2FileTest(BaseTest):
0040     "Test MCRYPT type miscelaneous methods."
0041 
0042     def setUp(self):
0043         self.filename = TESTFN
0044 
0045     def tearDown(self):
0046         if os.path.isfile(self.filename):
0047             os.unlink(self.filename)
0048 
0049     def createTempFile(self, crlf=0):
0050         f = open(self.filename, "wb")
0051         if crlf:
0052             data = self.DATA_CRLF
0053         else:
0054             data = self.DATA
0055         f.write(data)
0056         f.close()
0057 
0058     def testRead(self):
0059         # "Test BZ2File.read()"
0060         self.createTempFile()
0061         bz2f = BZ2File(self.filename)
0062         self.assertRaises(TypeError, bz2f.read, None)
0063         self.assertEqual(bz2f.read(), self.TEXT)
0064         bz2f.close()
0065 
0066     def testReadChunk10(self):
0067         # "Test BZ2File.read() in chunks of 10 bytes"
0068         self.createTempFile()
0069         bz2f = BZ2File(self.filename)
0070         text = ''
0071         while 1:
0072             str = bz2f.read(10)
0073             if not str:
0074                 break
0075             text += str
0076         self.assertEqual(text, text)
0077         bz2f.close()
0078 
0079     def testRead100(self):
0080         # "Test BZ2File.read(100)"
0081         self.createTempFile()
0082         bz2f = BZ2File(self.filename)
0083         self.assertEqual(bz2f.read(100), self.TEXT[:100])
0084         bz2f.close()
0085 
0086     def testReadLine(self):
0087         # "Test BZ2File.readline()"
0088         self.createTempFile()
0089         bz2f = BZ2File(self.filename)
0090         self.assertRaises(TypeError, bz2f.readline, None)
0091         sio = StringIO(self.TEXT)
0092         for line in sio.readlines():
0093             self.assertEqual(bz2f.readline(), line)
0094         bz2f.close()
0095 
0096     def testReadLines(self):
0097         # "Test BZ2File.readlines()"
0098         self.createTempFile()
0099         bz2f = BZ2File(self.filename)
0100         self.assertRaises(TypeError, bz2f.readlines, None)
0101         sio = StringIO(self.TEXT)
0102         self.assertEqual(bz2f.readlines(), sio.readlines())
0103         bz2f.close()
0104 
0105     def testIterator(self):
0106         # "Test iter(BZ2File)"
0107         self.createTempFile()
0108         bz2f = BZ2File(self.filename)
0109         sio = StringIO(self.TEXT)
0110         self.assertEqual(list(iter(bz2f)), sio.readlines())
0111         bz2f.close()
0112 
0113     def testXReadLines(self):
0114         # "Test BZ2File.xreadlines()"
0115         self.createTempFile()
0116         bz2f = BZ2File(self.filename)
0117         sio = StringIO(self.TEXT)
0118         self.assertEqual(list(bz2f.xreadlines()), sio.readlines())
0119         bz2f.close()
0120 
0121     def testUniversalNewlinesLF(self):
0122         # "Test BZ2File.read() with universal newlines (\\n)"
0123         self.createTempFile()
0124         bz2f = BZ2File(self.filename, "rU")
0125         self.assertEqual(bz2f.read(), self.TEXT)
0126         self.assertEqual(bz2f.newlines, "\n")
0127         bz2f.close()
0128 
0129     def testUniversalNewlinesCRLF(self):
0130         # "Test BZ2File.read() with universal newlines (\\r\\n)"
0131         self.createTempFile(crlf=1)
0132         bz2f = BZ2File(self.filename, "rU")
0133         self.assertEqual(bz2f.read(), self.TEXT)
0134         self.assertEqual(bz2f.newlines, "\r\n")
0135         bz2f.close()
0136 
0137     def testWrite(self):
0138         # "Test BZ2File.write()"
0139         bz2f = BZ2File(self.filename, "w")
0140         self.assertRaises(TypeError, bz2f.write)
0141         bz2f.write(self.TEXT)
0142         bz2f.close()
0143         f = open(self.filename, 'rb')
0144         self.assertEqual(self.decompress(f.read()), self.TEXT)
0145         f.close()
0146 
0147     def testWriteChunks10(self):
0148         # "Test BZ2File.write() with chunks of 10 bytes"
0149         bz2f = BZ2File(self.filename, "w")
0150         n = 0
0151         while 1:
0152             str = self.TEXT[n*10:(n+1)*10]
0153             if not str:
0154                 break
0155             bz2f.write(str)
0156             n += 1
0157         bz2f.close()
0158         f = open(self.filename, 'rb')
0159         self.assertEqual(self.decompress(f.read()), self.TEXT)
0160         f.close()
0161 
0162     def testWriteLines(self):
0163         # "Test BZ2File.writelines()"
0164         bz2f = BZ2File(self.filename, "w")
0165         self.assertRaises(TypeError, bz2f.writelines)
0166         sio = StringIO(self.TEXT)
0167         bz2f.writelines(sio.readlines())
0168         bz2f.close()
0169         f = open(self.filename, 'rb')
0170         self.assertEqual(self.decompress(f.read()), self.TEXT)
0171         f.close()
0172 
0173     def testSeekForward(self):
0174         # "Test BZ2File.seek(150, 0)"
0175         self.createTempFile()
0176         bz2f = BZ2File(self.filename)
0177         self.assertRaises(TypeError, bz2f.seek)
0178         bz2f.seek(150)
0179         self.assertEqual(bz2f.read(), self.TEXT[150:])
0180         bz2f.close()
0181 
0182     def testSeekBackwards(self):
0183         # "Test BZ2File.seek(-150, 1)"
0184         self.createTempFile()
0185         bz2f = BZ2File(self.filename)
0186         bz2f.read(500)
0187         bz2f.seek(-150, 1)
0188         self.assertEqual(bz2f.read(), self.TEXT[500-150:])
0189         bz2f.close()
0190 
0191     def testSeekBackwardsFromEnd(self):
0192         # "Test BZ2File.seek(-150, 2)"
0193         self.createTempFile()
0194         bz2f = BZ2File(self.filename)
0195         bz2f.seek(-150, 2)
0196         self.assertEqual(bz2f.read(), self.TEXT[len(self.TEXT)-150:])
0197         bz2f.close()
0198 
0199     def testSeekPostEnd(self):
0200         # "Test BZ2File.seek(150000)"
0201         self.createTempFile()
0202         bz2f = BZ2File(self.filename)
0203         bz2f.seek(150000)
0204         self.assertEqual(bz2f.tell(), len(self.TEXT))
0205         self.assertEqual(bz2f.read(), "")
0206         bz2f.close()
0207 
0208     def testSeekPostEndTwice(self):
0209         # "Test BZ2File.seek(150000) twice"
0210         self.createTempFile()
0211         bz2f = BZ2File(self.filename)
0212         bz2f.seek(150000)
0213         bz2f.seek(150000)
0214         self.assertEqual(bz2f.tell(), len(self.TEXT))
0215         self.assertEqual(bz2f.read(), "")
0216         bz2f.close()
0217 
0218     def testSeekPreStart(self):
0219         # "Test BZ2File.seek(-150, 0)"
0220         self.createTempFile()
0221         bz2f = BZ2File(self.filename)
0222         bz2f.seek(-150)
0223         self.assertEqual(bz2f.tell(), 0)
0224         self.assertEqual(bz2f.read(), self.TEXT)
0225         bz2f.close()
0226 
0227     def testOpenDel(self):
0228         # "Test opening and deleting a file many times"
0229         self.createTempFile()
0230         for i in xrange(10000):
0231             o = BZ2File(self.filename)
0232             del o
0233 
0234     def testOpenNonexistent(self):
0235         # "Test opening a nonexistent file"
0236         self.assertRaises(IOError, BZ2File, "/non/existent")
0237 
0238 class BZ2CompressorTest(BaseTest):
0239     def testCompress(self):
0240         # "Test BZ2Compressor.compress()/flush()"
0241         bz2c = BZ2Compressor()
0242         self.assertRaises(TypeError, bz2c.compress)
0243         data = bz2c.compress(self.TEXT)
0244         data += bz2c.flush()
0245         self.assertEqual(self.decompress(data), self.TEXT)
0246 
0247     def testCompressChunks10(self):
0248         # "Test BZ2Compressor.compress()/flush() with chunks of 10 bytes"
0249         bz2c = BZ2Compressor()
0250         n = 0
0251         data = ''
0252         while 1:
0253             str = self.TEXT[n*10:(n+1)*10]
0254             if not str:
0255                 break
0256             data += bz2c.compress(str)
0257             n += 1
0258         data += bz2c.flush()
0259         self.assertEqual(self.decompress(data), self.TEXT)
0260 
0261 class BZ2DecompressorTest(BaseTest):
0262     def test_Constructor(self):
0263         self.assertRaises(TypeError, BZ2Decompressor, 42)
0264 
0265     def testDecompress(self):
0266         # "Test BZ2Decompressor.decompress()"
0267         bz2d = BZ2Decompressor()
0268         self.assertRaises(TypeError, bz2d.decompress)
0269         text = bz2d.decompress(self.DATA)
0270         self.assertEqual(text, self.TEXT)
0271 
0272     def testDecompressChunks10(self):
0273         # "Test BZ2Decompressor.decompress() with chunks of 10 bytes"
0274         bz2d = BZ2Decompressor()
0275         text = ''
0276         n = 0
0277         while 1:
0278             str = self.DATA[n*10:(n+1)*10]
0279             if not str:
0280                 break
0281             text += bz2d.decompress(str)
0282             n += 1
0283         self.assertEqual(text, self.TEXT)
0284 
0285     def testDecompressUnusedData(self):
0286         # "Test BZ2Decompressor.decompress() with unused data"
0287         bz2d = BZ2Decompressor()
0288         unused_data = "this is unused data"
0289         text = bz2d.decompress(self.DATA+unused_data)
0290         self.assertEqual(text, self.TEXT)
0291         self.assertEqual(bz2d.unused_data, unused_data)
0292 
0293     def testEOFError(self):
0294         # "Calling BZ2Decompressor.decompress() after EOS must raise EOFError"
0295         bz2d = BZ2Decompressor()
0296         text = bz2d.decompress(self.DATA)
0297         self.assertRaises(EOFError, bz2d.decompress, "anything")
0298 
0299 
0300 class FuncTest(BaseTest):
0301     "Test module functions"
0302 
0303     def testCompress(self):
0304         # "Test compress() function"
0305         data = bz2.compress(self.TEXT)
0306         self.assertEqual(self.decompress(data), self.TEXT)
0307 
0308     def testDecompress(self):
0309         # "Test decompress() function"
0310         text = bz2.decompress(self.DATA)
0311         self.assertEqual(text, self.TEXT)
0312 
0313     def testDecompressEmpty(self):
0314         # "Test decompress() function with empty string"
0315         text = bz2.decompress("")
0316         self.assertEqual(text, "")
0317 
0318     def testDecompressIncomplete(self):
0319         # "Test decompress() function with incomplete data"
0320         self.assertRaises(ValueError, bz2.decompress, self.DATA[:-10])
0321 
0322 def test_main():
0323     test_support.run_unittest(
0324         BZ2FileTest,
0325         BZ2CompressorTest,
0326         BZ2DecompressorTest,
0327         FuncTest
0328     )
0329 
0330 if __name__ == '__main__':
0331     test_main()
0332 
0333 # vim:ts=4:sw=4
0334 

Generated by PyXR 0.9.4
SourceForge.net Logo