0001 #!/usr/bin/python 0002 from test import test_support 0003 from test.test_support import TESTFN 0004 0005 import unittest 0006 from cStringIO import StringIO 0007 import os 0008 import popen2 0009 import sys 0010 0011 import bz2 0012 from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor 0013 0014 has_cmdline_bunzip2 = sys.platform not in ("win32", "os2emx", "riscos") 0015 0016 class BaseTest(unittest.TestCase): 0017 "Base for other testcases." 0018 TEXT = 'root:x:0:0:root:/root:/bin/bash\nbin:x:1:1:bin:/bin:\ndaemon:x:2:2:daemon:/sbin:\nadm:x:3:4:adm:/var/adm:\nlp:x:4:7:lp:/var/spool/lpd:\nsync:x:5:0:sync:/sbin:/bin/sync\nshutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\nhalt:x:7:0:halt:/sbin:/sbin/halt\nmail:x:8:12:mail:/var/spool/mail:\nnews:x:9:13:news:/var/spool/news:\nuucp:x:10:14:uucp:/var/spool/uucp:\noperator:x:11:0:operator:/root:\ngames:x:12:100:games:/usr/games:\ngopher:x:13:30:gopher:/usr/lib/gopher-data:\nftp:x:14:50:FTP User:/var/ftp:/bin/bash\nnobody:x:65534:65534:Nobody:/home:\npostfix:x:100:101:postfix:/var/spool/postfix:\nniemeyer:x:500:500::/home/niemeyer:/bin/bash\npostgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\nmysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\nwww:x:103:104::/var/www:/bin/false\n' 0019 DATA = 'BZh91AY&SY.\xc8N\x18\x00\x01>_\x80\x00\x10@\x02\xff\xf0\x01\x07n\x00?\xe7\xff\xe00\x01\x99\xaa\x00\xc0\x03F\x86\x8c#&\x83F\x9a\x03\x06\xa6\xd0\xa6\x93M\x0fQ\xa7\xa8\x06\x804hh\x12$\x11\xa4i4\xf14S\xd2<Q\xb5\x0fH\xd3\xd4\xdd\xd5\x87\xbb\xf8\x94\r\x8f\xafI\x12\xe1\xc9\xf8/E\x00pu\x89\x12]\xc9\xbbDL\nQ\x0e\t1\x12\xdf\xa0\xc0\x97\xac2O9\x89\x13\x94\x0e\x1c7\x0ed\x95I\x0c\xaaJ\xa4\x18L\x10\x05#\x9c\xaf\xba\xbc/\x97\x8a#C\xc8\xe1\x8cW\xf9\xe2\xd0\xd6M\xa7\x8bXa<e\x84t\xcbL\xb3\xa7\xd9\xcd\xd1\xcb\x84.\xaf\xb3\xab\xab\xad`n}\xa0lh\tE,\x8eZ\x15\x17VH>\x88\xe5\xcd9gd6\x0b\n\xe9\x9b\xd5\x8a\x99\xf7\x08.K\x8ev\xfb\xf7xw\xbb\xdf\xa1\x92\xf1\xdd|/";\xa2\xba\x9f\xd5\xb1#A\xb6\xf6\xb3o\xc9\xc5y\\\xebO\xe7\x85\x9a\xbc\xb6f8\x952\xd5\xd7"%\x89>V,\xf7\xa6z\xe2\x9f\xa3\xdf\x11\x11"\xd6E)I\xa9\x13^\xca\xf3r\xd0\x03U\x922\xf26\xec\xb6\xed\x8b\xc3U\x13\x9d\xc5\x170\xa4\xfa^\x92\xacDF\x8a\x97\xd6\x19\xfe\xdd\xb8\xbd\x1a\x9a\x19\xa3\x80ankR\x8b\xe5\xd83]\xa9\xc6\x08\x82f\xf6\xb9"6l$\xb8j@\xc0\x8a\xb0l1..\xbak\x83ls\x15\xbc\xf4\xc1\x13\xbe\xf8E\xb8\x9d\r\xa8\x9dk\x84\xd3n\xfa\xacQ\x07\xb1%y\xaav\xb4\x08\xe0z\x1b\x16\xf5\x04\xe9\xcc\xb9\x08z\x1en7.G\xfc]\xc9\x14\xe1B@\xbb!8`' 0020 DATA_CRLF = 'BZh91AY&SY\xaez\xbbN\x00\x01H\xdf\x80\x00\x12@\x02\xff\xf0\x01\x07n\x00?\xe7\xff\xe0@\x01\xbc\xc6`\x86*\x8d=M\xa9\x9a\x86\xd0L@\x0fI\xa6!\xa1\x13\xc8\x88jdi\x8d@\x03@\x1a\x1a\x0c\x0c\x83 \x00\xc4h2\x19\x01\x82D\x84e\t\xe8\x99\x89\x19\x1ah\x00\r\x1a\x11\xaf\x9b\x0fG\xf5(\x1b\x1f?\t\x12\xcf\xb5\xfc\x95E\x00ps\x89\x12^\xa4\xdd\xa2&\x05(\x87\x04\x98\x89u\xe40%\xb6\x19\'\x8c\xc4\x89\xca\x07\x0e\x1b!\x91UIFU%C\x994!DI\xd2\xfa\xf0\xf1N8W\xde\x13A\xf5\x9cr%?\x9f3;I45A\xd1\x8bT\xb1<l\xba\xcb_\xc00xY\x17r\x17\x88\x08\x08@\xa0\ry@\x10\x04$)`\xf2\xce\x89z\xb0s\xec\x9b.iW\x9d\x81\xb5-+t\x9f\x1a\'\x97dB\xf5x\xb5\xbe.[.\xd7\x0e\x81\xe7\x08\x1cN`\x88\x10\xca\x87\xc3!"\x80\x92R\xa1/\xd1\xc0\xe6mf\xac\xbd\x99\xcca\xb3\x8780>\xa4\xc7\x8d\x1a\\"\xad\xa1\xabyBg\x15\xb9l\x88\x88\x91k"\x94\xa4\xd4\x89\xae*\xa6\x0b\x10\x0c\xd6\xd4m\xe86\xec\xb5j\x8a\x86j\';\xca.\x01I\xf2\xaaJ\xe8\x88\x8cU+t3\xfb\x0c\n\xa33\x13r2\r\x16\xe0\xb3(\xbf\x1d\x83r\xe7M\xf0D\x1365\xd8\x88\xd3\xa4\x92\xcb2\x06\x04\\\xc1\xb0\xea//\xbek&\xd8\xe6+t\xe5\xa1\x13\xada\x16\xder5"w]\xa2i\xb7[\x97R \xe2IT\xcd;Z\x04dk4\xad\x8a\t\xd3\x81z\x10\xf1:^`\xab\x1f\xc5\xdc\x91N\x14$+\x9e\xae\xd3\x80' 0021 0022 if has_cmdline_bunzip2: 0023 def decompress(self, data): 0024 pop = popen2.Popen3("bunzip2", capturestderr=1) 0025 pop.tochild.write(data) 0026 pop.tochild.close() 0027 ret = pop.fromchild.read() 0028 pop.fromchild.close() 0029 if pop.wait() != 0: 0030 ret = bz2.decompress(data) 0031 return ret 0032 0033 else: 0034 # popen2.Popen3 doesn't exist on Windows, and even if it did, bunzip2 0035 # isn't available to run. 0036 def decompress(self, data): 0037 return bz2.decompress(data) 0038 0039 class BZ2FileTest(BaseTest): 0040 "Test MCRYPT type miscelaneous methods." 0041 0042 def setUp(self): 0043 self.filename = TESTFN 0044 0045 def tearDown(self): 0046 if os.path.isfile(self.filename): 0047 os.unlink(self.filename) 0048 0049 def createTempFile(self, crlf=0): 0050 f = open(self.filename, "wb") 0051 if crlf: 0052 data = self.DATA_CRLF 0053 else: 0054 data = self.DATA 0055 f.write(data) 0056 f.close() 0057 0058 def testRead(self): 0059 # "Test BZ2File.read()" 0060 self.createTempFile() 0061 bz2f = BZ2File(self.filename) 0062 self.assertRaises(TypeError, bz2f.read, None) 0063 self.assertEqual(bz2f.read(), self.TEXT) 0064 bz2f.close() 0065 0066 def testReadChunk10(self): 0067 # "Test BZ2File.read() in chunks of 10 bytes" 0068 self.createTempFile() 0069 bz2f = BZ2File(self.filename) 0070 text = '' 0071 while 1: 0072 str = bz2f.read(10) 0073 if not str: 0074 break 0075 text += str 0076 self.assertEqual(text, text) 0077 bz2f.close() 0078 0079 def testRead100(self): 0080 # "Test BZ2File.read(100)" 0081 self.createTempFile() 0082 bz2f = BZ2File(self.filename) 0083 self.assertEqual(bz2f.read(100), self.TEXT[:100]) 0084 bz2f.close() 0085 0086 def testReadLine(self): 0087 # "Test BZ2File.readline()" 0088 self.createTempFile() 0089 bz2f = BZ2File(self.filename) 0090 self.assertRaises(TypeError, bz2f.readline, None) 0091 sio = StringIO(self.TEXT) 0092 for line in sio.readlines(): 0093 self.assertEqual(bz2f.readline(), line) 0094 bz2f.close() 0095 0096 def testReadLines(self): 0097 # "Test BZ2File.readlines()" 0098 self.createTempFile() 0099 bz2f = BZ2File(self.filename) 0100 self.assertRaises(TypeError, bz2f.readlines, None) 0101 sio = StringIO(self.TEXT) 0102 self.assertEqual(bz2f.readlines(), sio.readlines()) 0103 bz2f.close() 0104 0105 def testIterator(self): 0106 # "Test iter(BZ2File)" 0107 self.createTempFile() 0108 bz2f = BZ2File(self.filename) 0109 sio = StringIO(self.TEXT) 0110 self.assertEqual(list(iter(bz2f)), sio.readlines()) 0111 bz2f.close() 0112 0113 def testXReadLines(self): 0114 # "Test BZ2File.xreadlines()" 0115 self.createTempFile() 0116 bz2f = BZ2File(self.filename) 0117 sio = StringIO(self.TEXT) 0118 self.assertEqual(list(bz2f.xreadlines()), sio.readlines()) 0119 bz2f.close() 0120 0121 def testUniversalNewlinesLF(self): 0122 # "Test BZ2File.read() with universal newlines (\\n)" 0123 self.createTempFile() 0124 bz2f = BZ2File(self.filename, "rU") 0125 self.assertEqual(bz2f.read(), self.TEXT) 0126 self.assertEqual(bz2f.newlines, "\n") 0127 bz2f.close() 0128 0129 def testUniversalNewlinesCRLF(self): 0130 # "Test BZ2File.read() with universal newlines (\\r\\n)" 0131 self.createTempFile(crlf=1) 0132 bz2f = BZ2File(self.filename, "rU") 0133 self.assertEqual(bz2f.read(), self.TEXT) 0134 self.assertEqual(bz2f.newlines, "\r\n") 0135 bz2f.close() 0136 0137 def testWrite(self): 0138 # "Test BZ2File.write()" 0139 bz2f = BZ2File(self.filename, "w") 0140 self.assertRaises(TypeError, bz2f.write) 0141 bz2f.write(self.TEXT) 0142 bz2f.close() 0143 f = open(self.filename, 'rb') 0144 self.assertEqual(self.decompress(f.read()), self.TEXT) 0145 f.close() 0146 0147 def testWriteChunks10(self): 0148 # "Test BZ2File.write() with chunks of 10 bytes" 0149 bz2f = BZ2File(self.filename, "w") 0150 n = 0 0151 while 1: 0152 str = self.TEXT[n*10:(n+1)*10] 0153 if not str: 0154 break 0155 bz2f.write(str) 0156 n += 1 0157 bz2f.close() 0158 f = open(self.filename, 'rb') 0159 self.assertEqual(self.decompress(f.read()), self.TEXT) 0160 f.close() 0161 0162 def testWriteLines(self): 0163 # "Test BZ2File.writelines()" 0164 bz2f = BZ2File(self.filename, "w") 0165 self.assertRaises(TypeError, bz2f.writelines) 0166 sio = StringIO(self.TEXT) 0167 bz2f.writelines(sio.readlines()) 0168 bz2f.close() 0169 f = open(self.filename, 'rb') 0170 self.assertEqual(self.decompress(f.read()), self.TEXT) 0171 f.close() 0172 0173 def testSeekForward(self): 0174 # "Test BZ2File.seek(150, 0)" 0175 self.createTempFile() 0176 bz2f = BZ2File(self.filename) 0177 self.assertRaises(TypeError, bz2f.seek) 0178 bz2f.seek(150) 0179 self.assertEqual(bz2f.read(), self.TEXT[150:]) 0180 bz2f.close() 0181 0182 def testSeekBackwards(self): 0183 # "Test BZ2File.seek(-150, 1)" 0184 self.createTempFile() 0185 bz2f = BZ2File(self.filename) 0186 bz2f.read(500) 0187 bz2f.seek(-150, 1) 0188 self.assertEqual(bz2f.read(), self.TEXT[500-150:]) 0189 bz2f.close() 0190 0191 def testSeekBackwardsFromEnd(self): 0192 # "Test BZ2File.seek(-150, 2)" 0193 self.createTempFile() 0194 bz2f = BZ2File(self.filename) 0195 bz2f.seek(-150, 2) 0196 self.assertEqual(bz2f.read(), self.TEXT[len(self.TEXT)-150:]) 0197 bz2f.close() 0198 0199 def testSeekPostEnd(self): 0200 # "Test BZ2File.seek(150000)" 0201 self.createTempFile() 0202 bz2f = BZ2File(self.filename) 0203 bz2f.seek(150000) 0204 self.assertEqual(bz2f.tell(), len(self.TEXT)) 0205 self.assertEqual(bz2f.read(), "") 0206 bz2f.close() 0207 0208 def testSeekPostEndTwice(self): 0209 # "Test BZ2File.seek(150000) twice" 0210 self.createTempFile() 0211 bz2f = BZ2File(self.filename) 0212 bz2f.seek(150000) 0213 bz2f.seek(150000) 0214 self.assertEqual(bz2f.tell(), len(self.TEXT)) 0215 self.assertEqual(bz2f.read(), "") 0216 bz2f.close() 0217 0218 def testSeekPreStart(self): 0219 # "Test BZ2File.seek(-150, 0)" 0220 self.createTempFile() 0221 bz2f = BZ2File(self.filename) 0222 bz2f.seek(-150) 0223 self.assertEqual(bz2f.tell(), 0) 0224 self.assertEqual(bz2f.read(), self.TEXT) 0225 bz2f.close() 0226 0227 def testOpenDel(self): 0228 # "Test opening and deleting a file many times" 0229 self.createTempFile() 0230 for i in xrange(10000): 0231 o = BZ2File(self.filename) 0232 del o 0233 0234 def testOpenNonexistent(self): 0235 # "Test opening a nonexistent file" 0236 self.assertRaises(IOError, BZ2File, "/non/existent") 0237 0238 class BZ2CompressorTest(BaseTest): 0239 def testCompress(self): 0240 # "Test BZ2Compressor.compress()/flush()" 0241 bz2c = BZ2Compressor() 0242 self.assertRaises(TypeError, bz2c.compress) 0243 data = bz2c.compress(self.TEXT) 0244 data += bz2c.flush() 0245 self.assertEqual(self.decompress(data), self.TEXT) 0246 0247 def testCompressChunks10(self): 0248 # "Test BZ2Compressor.compress()/flush() with chunks of 10 bytes" 0249 bz2c = BZ2Compressor() 0250 n = 0 0251 data = '' 0252 while 1: 0253 str = self.TEXT[n*10:(n+1)*10] 0254 if not str: 0255 break 0256 data += bz2c.compress(str) 0257 n += 1 0258 data += bz2c.flush() 0259 self.assertEqual(self.decompress(data), self.TEXT) 0260 0261 class BZ2DecompressorTest(BaseTest): 0262 def test_Constructor(self): 0263 self.assertRaises(TypeError, BZ2Decompressor, 42) 0264 0265 def testDecompress(self): 0266 # "Test BZ2Decompressor.decompress()" 0267 bz2d = BZ2Decompressor() 0268 self.assertRaises(TypeError, bz2d.decompress) 0269 text = bz2d.decompress(self.DATA) 0270 self.assertEqual(text, self.TEXT) 0271 0272 def testDecompressChunks10(self): 0273 # "Test BZ2Decompressor.decompress() with chunks of 10 bytes" 0274 bz2d = BZ2Decompressor() 0275 text = '' 0276 n = 0 0277 while 1: 0278 str = self.DATA[n*10:(n+1)*10] 0279 if not str: 0280 break 0281 text += bz2d.decompress(str) 0282 n += 1 0283 self.assertEqual(text, self.TEXT) 0284 0285 def testDecompressUnusedData(self): 0286 # "Test BZ2Decompressor.decompress() with unused data" 0287 bz2d = BZ2Decompressor() 0288 unused_data = "this is unused data" 0289 text = bz2d.decompress(self.DATA+unused_data) 0290 self.assertEqual(text, self.TEXT) 0291 self.assertEqual(bz2d.unused_data, unused_data) 0292 0293 def testEOFError(self): 0294 # "Calling BZ2Decompressor.decompress() after EOS must raise EOFError" 0295 bz2d = BZ2Decompressor() 0296 text = bz2d.decompress(self.DATA) 0297 self.assertRaises(EOFError, bz2d.decompress, "anything") 0298 0299 0300 class FuncTest(BaseTest): 0301 "Test module functions" 0302 0303 def testCompress(self): 0304 # "Test compress() function" 0305 data = bz2.compress(self.TEXT) 0306 self.assertEqual(self.decompress(data), self.TEXT) 0307 0308 def testDecompress(self): 0309 # "Test decompress() function" 0310 text = bz2.decompress(self.DATA) 0311 self.assertEqual(text, self.TEXT) 0312 0313 def testDecompressEmpty(self): 0314 # "Test decompress() function with empty string" 0315 text = bz2.decompress("") 0316 self.assertEqual(text, "") 0317 0318 def testDecompressIncomplete(self): 0319 # "Test decompress() function with incomplete data" 0320 self.assertRaises(ValueError, bz2.decompress, self.DATA[:-10]) 0321 0322 def test_main(): 0323 test_support.run_unittest( 0324 BZ2FileTest, 0325 BZ2CompressorTest, 0326 BZ2DecompressorTest, 0327 FuncTest 0328 ) 0329 0330 if __name__ == '__main__': 0331 test_main() 0332 0333 # vim:ts=4:sw=4 0334
Generated by PyXR 0.9.4