0001 import unittest 0002 import pickle 0003 import cPickle 0004 import pickletools 0005 import copy_reg 0006 0007 from test.test_support import TestFailed, have_unicode, TESTFN 0008 0009 # Tests that try a number of pickle protocols should have a 0010 # for proto in protocols: 0011 # kind of outer loop. 0012 assert pickle.HIGHEST_PROTOCOL == cPickle.HIGHEST_PROTOCOL == 2 0013 protocols = range(pickle.HIGHEST_PROTOCOL + 1) 0014 0015 0016 # Return True if opcode code appears in the pickle, else False. 0017 def opcode_in_pickle(code, pickle): 0018 for op, dummy, dummy in pickletools.genops(pickle): 0019 if op.code == code: 0020 return True 0021 return False 0022 0023 # Return the number of times opcode code appears in pickle. 0024 def count_opcode(code, pickle): 0025 n = 0 0026 for op, dummy, dummy in pickletools.genops(pickle): 0027 if op.code == code: 0028 n += 1 0029 return n 0030 0031 # We can't very well test the extension registry without putting known stuff 0032 # in it, but we have to be careful to restore its original state. Code 0033 # should do this: 0034 # 0035 # e = ExtensionSaver(extension_code) 0036 # try: 0037 # fiddle w/ the extension registry's stuff for extension_code 0038 # finally: 0039 # e.restore() 0040 0041 class ExtensionSaver: 0042 # Remember current registration for code (if any), and remove it (if 0043 # there is one). 0044 def __init__(self, code): 0045 self.code = code 0046 if code in copy_reg._inverted_registry: 0047 self.pair = copy_reg._inverted_registry[code] 0048 copy_reg.remove_extension(self.pair[0], self.pair[1], code) 0049 else: 0050 self.pair = None 0051 0052 # Restore previous registration for code. 0053 def restore(self): 0054 code = self.code 0055 curpair = copy_reg._inverted_registry.get(code) 0056 if curpair is not None: 0057 copy_reg.remove_extension(curpair[0], curpair[1], code) 0058 pair = self.pair 0059 if pair is not None: 0060 copy_reg.add_extension(pair[0], pair[1], code) 0061 0062 class C: 0063 def __cmp__(self, other): 0064 return cmp(self.__dict__, other.__dict__) 0065 0066 import __main__ 0067 __main__.C = C 0068 C.__module__ = "__main__" 0069 0070 class myint(int): 0071 def __init__(self, x): 0072 self.str = str(x) 0073 0074 class initarg(C): 0075 0076 def __init__(self, a, b): 0077 self.a = a 0078 self.b = b 0079 0080 def __getinitargs__(self): 0081 return self.a, self.b 0082 0083 class metaclass(type): 0084 pass 0085 0086 class use_metaclass(object): 0087 __metaclass__ = metaclass 0088 0089 # DATA0 .. DATA2 are the pickles we expect under the various protocols, for 0090 # the object returned by create_data(). 0091 0092 # break into multiple strings to avoid confusing font-lock-mode 0093 DATA0 = """(lp1 0094 I0 0095 aL1L 0096 aF2 0097 ac__builtin__ 0098 complex 0099 p2 0100 """ + \ 0101 """(F3 0102 F0 0103 tRp3 0104 aI1 0105 aI-1 0106 aI255 0107 aI-255 0108 aI-256 0109 aI65535 0110 aI-65535 0111 aI-65536 0112 aI2147483647 0113 aI-2147483647 0114 aI-2147483648 0115 a""" + \ 0116 """(S'abc' 0117 p4 0118 g4 0119 """ + \ 0120 """(i__main__ 0121 C 0122 p5 0123 """ + \ 0124 """(dp6 0125 S'foo' 0126 p7 0127 I1 0128 sS'bar' 0129 p8 0130 I2 0131 sbg5 0132 tp9 0133 ag9 0134 aI5 0135 a. 0136 """ 0137 0138 # Disassembly of DATA0. 0139 DATA0_DIS = """\ 0140 0: ( MARK 0141 1: l LIST (MARK at 0) 0142 2: p PUT 1 0143 5: I INT 0 0144 8: a APPEND 0145 9: L LONG 1L 0146 13: a APPEND 0147 14: F FLOAT 2.0 0148 17: a APPEND 0149 18: c GLOBAL '__builtin__ complex' 0150 39: p PUT 2 0151 42: ( MARK 0152 43: F FLOAT 3.0 0153 46: F FLOAT 0.0 0154 49: t TUPLE (MARK at 42) 0155 50: R REDUCE 0156 51: p PUT 3 0157 54: a APPEND 0158 55: I INT 1 0159 58: a APPEND 0160 59: I INT -1 0161 63: a APPEND 0162 64: I INT 255 0163 69: a APPEND 0164 70: I INT -255 0165 76: a APPEND 0166 77: I INT -256 0167 83: a APPEND 0168 84: I INT 65535 0169 91: a APPEND 0170 92: I INT -65535 0171 100: a APPEND 0172 101: I INT -65536 0173 109: a APPEND 0174 110: I INT 2147483647 0175 122: a APPEND 0176 123: I INT -2147483647 0177 136: a APPEND 0178 137: I INT -2147483648 0179 150: a APPEND 0180 151: ( MARK 0181 152: S STRING 'abc' 0182 159: p PUT 4 0183 162: g GET 4 0184 165: ( MARK 0185 166: i INST '__main__ C' (MARK at 165) 0186 178: p PUT 5 0187 181: ( MARK 0188 182: d DICT (MARK at 181) 0189 183: p PUT 6 0190 186: S STRING 'foo' 0191 193: p PUT 7 0192 196: I INT 1 0193 199: s SETITEM 0194 200: S STRING 'bar' 0195 207: p PUT 8 0196 210: I INT 2 0197 213: s SETITEM 0198 214: b BUILD 0199 215: g GET 5 0200 218: t TUPLE (MARK at 151) 0201 219: p PUT 9 0202 222: a APPEND 0203 223: g GET 9 0204 226: a APPEND 0205 227: I INT 5 0206 230: a APPEND 0207 231: . STOP 0208 highest protocol among opcodes = 0 0209 """ 0210 0211 DATA1 = (']q\x01(K\x00L1L\nG@\x00\x00\x00\x00\x00\x00\x00' 0212 'c__builtin__\ncomplex\nq\x02(G@\x08\x00\x00\x00\x00\x00' 0213 '\x00G\x00\x00\x00\x00\x00\x00\x00\x00tRq\x03K\x01J\xff\xff' 0214 '\xff\xffK\xffJ\x01\xff\xff\xffJ\x00\xff\xff\xffM\xff\xff' 0215 'J\x01\x00\xff\xffJ\x00\x00\xff\xffJ\xff\xff\xff\x7fJ\x01\x00' 0216 '\x00\x80J\x00\x00\x00\x80(U\x03abcq\x04h\x04(c__main__\n' 0217 'C\nq\x05oq\x06}q\x07(U\x03fooq\x08K\x01U\x03barq\tK\x02ubh' 0218 '\x06tq\nh\nK\x05e.' 0219 ) 0220 0221 # Disassembly of DATA1. 0222 DATA1_DIS = """\ 0223 0: ] EMPTY_LIST 0224 1: q BINPUT 1 0225 3: ( MARK 0226 4: K BININT1 0 0227 6: L LONG 1L 0228 10: G BINFLOAT 2.0 0229 19: c GLOBAL '__builtin__ complex' 0230 40: q BINPUT 2 0231 42: ( MARK 0232 43: G BINFLOAT 3.0 0233 52: G BINFLOAT 0.0 0234 61: t TUPLE (MARK at 42) 0235 62: R REDUCE 0236 63: q BINPUT 3 0237 65: K BININT1 1 0238 67: J BININT -1 0239 72: K BININT1 255 0240 74: J BININT -255 0241 79: J BININT -256 0242 84: M BININT2 65535 0243 87: J BININT -65535 0244 92: J BININT -65536 0245 97: J BININT 2147483647 0246 102: J BININT -2147483647 0247 107: J BININT -2147483648 0248 112: ( MARK 0249 113: U SHORT_BINSTRING 'abc' 0250 118: q BINPUT 4 0251 120: h BINGET 4 0252 122: ( MARK 0253 123: c GLOBAL '__main__ C' 0254 135: q BINPUT 5 0255 137: o OBJ (MARK at 122) 0256 138: q BINPUT 6 0257 140: } EMPTY_DICT 0258 141: q BINPUT 7 0259 143: ( MARK 0260 144: U SHORT_BINSTRING 'foo' 0261 149: q BINPUT 8 0262 151: K BININT1 1 0263 153: U SHORT_BINSTRING 'bar' 0264 158: q BINPUT 9 0265 160: K BININT1 2 0266 162: u SETITEMS (MARK at 143) 0267 163: b BUILD 0268 164: h BINGET 6 0269 166: t TUPLE (MARK at 112) 0270 167: q BINPUT 10 0271 169: h BINGET 10 0272 171: K BININT1 5 0273 173: e APPENDS (MARK at 3) 0274 174: . STOP 0275 highest protocol among opcodes = 1 0276 """ 0277 0278 DATA2 = ('\x80\x02]q\x01(K\x00\x8a\x01\x01G@\x00\x00\x00\x00\x00\x00\x00' 0279 'c__builtin__\ncomplex\nq\x02G@\x08\x00\x00\x00\x00\x00\x00G\x00' 0280 '\x00\x00\x00\x00\x00\x00\x00\x86Rq\x03K\x01J\xff\xff\xff\xffK' 0281 '\xffJ\x01\xff\xff\xffJ\x00\xff\xff\xffM\xff\xffJ\x01\x00\xff\xff' 0282 'J\x00\x00\xff\xffJ\xff\xff\xff\x7fJ\x01\x00\x00\x80J\x00\x00\x00' 0283 '\x80(U\x03abcq\x04h\x04(c__main__\nC\nq\x05oq\x06}q\x07(U\x03foo' 0284 'q\x08K\x01U\x03barq\tK\x02ubh\x06tq\nh\nK\x05e.') 0285 0286 # Disassembly of DATA2. 0287 DATA2_DIS = """\ 0288 0: \x80 PROTO 2 0289 2: ] EMPTY_LIST 0290 3: q BINPUT 1 0291 5: ( MARK 0292 6: K BININT1 0 0293 8: \x8a LONG1 1L 0294 11: G BINFLOAT 2.0 0295 20: c GLOBAL '__builtin__ complex' 0296 41: q BINPUT 2 0297 43: G BINFLOAT 3.0 0298 52: G BINFLOAT 0.0 0299 61: \x86 TUPLE2 0300 62: R REDUCE 0301 63: q BINPUT 3 0302 65: K BININT1 1 0303 67: J BININT -1 0304 72: K BININT1 255 0305 74: J BININT -255 0306 79: J BININT -256 0307 84: M BININT2 65535 0308 87: J BININT -65535 0309 92: J BININT -65536 0310 97: J BININT 2147483647 0311 102: J BININT -2147483647 0312 107: J BININT -2147483648 0313 112: ( MARK 0314 113: U SHORT_BINSTRING 'abc' 0315 118: q BINPUT 4 0316 120: h BINGET 4 0317 122: ( MARK 0318 123: c GLOBAL '__main__ C' 0319 135: q BINPUT 5 0320 137: o OBJ (MARK at 122) 0321 138: q BINPUT 6 0322 140: } EMPTY_DICT 0323 141: q BINPUT 7 0324 143: ( MARK 0325 144: U SHORT_BINSTRING 'foo' 0326 149: q BINPUT 8 0327 151: K BININT1 1 0328 153: U SHORT_BINSTRING 'bar' 0329 158: q BINPUT 9 0330 160: K BININT1 2 0331 162: u SETITEMS (MARK at 143) 0332 163: b BUILD 0333 164: h BINGET 6 0334 166: t TUPLE (MARK at 112) 0335 167: q BINPUT 10 0336 169: h BINGET 10 0337 171: K BININT1 5 0338 173: e APPENDS (MARK at 5) 0339 174: . STOP 0340 highest protocol among opcodes = 2 0341 """ 0342 0343 def create_data(): 0344 c = C() 0345 c.foo = 1 0346 c.bar = 2 0347 x = [0, 1L, 2.0, 3.0+0j] 0348 # Append some integer test cases at cPickle.c's internal size 0349 # cutoffs. 0350 uint1max = 0xff 0351 uint2max = 0xffff 0352 int4max = 0x7fffffff 0353 x.extend([1, -1, 0354 uint1max, -uint1max, -uint1max-1, 0355 uint2max, -uint2max, -uint2max-1, 0356 int4max, -int4max, -int4max-1]) 0357 y = ('abc', 'abc', c, c) 0358 x.append(y) 0359 x.append(y) 0360 x.append(5) 0361 return x 0362 0363 class AbstractPickleTests(unittest.TestCase): 0364 # Subclass must define self.dumps, self.loads, self.error. 0365 0366 _testdata = create_data() 0367 0368 def setUp(self): 0369 pass 0370 0371 def test_misc(self): 0372 # test various datatypes not tested by testdata 0373 for proto in protocols: 0374 x = myint(4) 0375 s = self.dumps(x, proto) 0376 y = self.loads(s) 0377 self.assertEqual(x, y) 0378 0379 x = (1, ()) 0380 s = self.dumps(x, proto) 0381 y = self.loads(s) 0382 self.assertEqual(x, y) 0383 0384 x = initarg(1, x) 0385 s = self.dumps(x, proto) 0386 y = self.loads(s) 0387 self.assertEqual(x, y) 0388 0389 # XXX test __reduce__ protocol? 0390 0391 def test_roundtrip_equality(self): 0392 expected = self._testdata 0393 for proto in protocols: 0394 s = self.dumps(expected, proto) 0395 got = self.loads(s) 0396 self.assertEqual(expected, got) 0397 0398 def test_load_from_canned_string(self): 0399 expected = self._testdata 0400 for canned in DATA0, DATA1, DATA2: 0401 got = self.loads(canned) 0402 self.assertEqual(expected, got) 0403 0404 # There are gratuitous differences between pickles produced by 0405 # pickle and cPickle, largely because cPickle starts PUT indices at 0406 # 1 and pickle starts them at 0. See XXX comment in cPickle's put2() -- 0407 # there's a comment with an exclamation point there whose meaning 0408 # is a mystery. cPickle also suppresses PUT for objects with a refcount 0409 # of 1. 0410 def dont_test_disassembly(self): 0411 from cStringIO import StringIO 0412 from pickletools import dis 0413 0414 for proto, expected in (0, DATA0_DIS), (1, DATA1_DIS): 0415 s = self.dumps(self._testdata, proto) 0416 filelike = StringIO() 0417 dis(s, out=filelike) 0418 got = filelike.getvalue() 0419 self.assertEqual(expected, got) 0420 0421 def test_recursive_list(self): 0422 l = [] 0423 l.append(l) 0424 for proto in protocols: 0425 s = self.dumps(l, proto) 0426 x = self.loads(s) 0427 self.assertEqual(len(x), 1) 0428 self.assert_(x is x[0]) 0429 0430 def test_recursive_dict(self): 0431 d = {} 0432 d[1] = d 0433 for proto in protocols: 0434 s = self.dumps(d, proto) 0435 x = self.loads(s) 0436 self.assertEqual(x.keys(), [1]) 0437 self.assert_(x[1] is x) 0438 0439 def test_recursive_inst(self): 0440 i = C() 0441 i.attr = i 0442 for proto in protocols: 0443 s = self.dumps(i, 2) 0444 x = self.loads(s) 0445 self.assertEqual(dir(x), dir(i)) 0446 self.assert_(x.attr is x) 0447 0448 def test_recursive_multi(self): 0449 l = [] 0450 d = {1:l} 0451 i = C() 0452 i.attr = d 0453 l.append(i) 0454 for proto in protocols: 0455 s = self.dumps(l, proto) 0456 x = self.loads(s) 0457 self.assertEqual(len(x), 1) 0458 self.assertEqual(dir(x[0]), dir(i)) 0459 self.assertEqual(x[0].attr.keys(), [1]) 0460 self.assert_(x[0].attr[1] is x) 0461 0462 def test_garyp(self): 0463 self.assertRaises(self.error, self.loads, 'garyp') 0464 0465 def test_insecure_strings(self): 0466 insecure = ["abc", "2 + 2", # not quoted 0467 #"'abc' + 'def'", # not a single quoted string 0468 "'abc", # quote is not closed 0469 "'abc\"", # open quote and close quote don't match 0470 "'abc' ?", # junk after close quote 0471 "'\\'", # trailing backslash 0472 # some tests of the quoting rules 0473 #"'abc\"\''", 0474 #"'\\\\a\'\'\'\\\'\\\\\''", 0475 ] 0476 for s in insecure: 0477 buf = "S" + s + "\012p0\012." 0478 self.assertRaises(ValueError, self.loads, buf) 0479 0480 if have_unicode: 0481 def test_unicode(self): 0482 endcases = [unicode(''), unicode('<\\u>'), unicode('<\\\u1234>'), 0483 unicode('<\n>'), unicode('<\\>')] 0484 for proto in protocols: 0485 for u in endcases: 0486 p = self.dumps(u, proto) 0487 u2 = self.loads(p) 0488 self.assertEqual(u2, u) 0489 0490 def test_ints(self): 0491 import sys 0492 for proto in protocols: 0493 n = sys.maxint 0494 while n: 0495 for expected in (-n, n): 0496 s = self.dumps(expected, proto) 0497 n2 = self.loads(s) 0498 self.assertEqual(expected, n2) 0499 n = n >> 1 0500 0501 def test_maxint64(self): 0502 maxint64 = (1L << 63) - 1 0503 data = 'I' + str(maxint64) + '\n.' 0504 got = self.loads(data) 0505 self.assertEqual(got, maxint64) 0506 0507 # Try too with a bogus literal. 0508 data = 'I' + str(maxint64) + 'JUNK\n.' 0509 self.assertRaises(ValueError, self.loads, data) 0510 0511 def test_long(self): 0512 for proto in protocols: 0513 # 256 bytes is where LONG4 begins. 0514 for nbits in 1, 8, 8*254, 8*255, 8*256, 8*257: 0515 nbase = 1L << nbits 0516 for npos in nbase-1, nbase, nbase+1: 0517 for n in npos, -npos: 0518 pickle = self.dumps(n, proto) 0519 got = self.loads(pickle) 0520 self.assertEqual(n, got) 0521 # Try a monster. This is quadratic-time in protos 0 & 1, so don't 0522 # bother with those. 0523 nbase = long("deadbeeffeedface", 16) 0524 nbase += nbase << 1000000 0525 for n in nbase, -nbase: 0526 p = self.dumps(n, 2) 0527 got = self.loads(p) 0528 self.assertEqual(n, got) 0529 0530 def test_reduce(self): 0531 pass 0532 0533 def test_getinitargs(self): 0534 pass 0535 0536 def test_metaclass(self): 0537 a = use_metaclass() 0538 for proto in protocols: 0539 s = self.dumps(a, proto) 0540 b = self.loads(s) 0541 self.assertEqual(a.__class__, b.__class__) 0542 0543 def test_structseq(self): 0544 import time 0545 import os 0546 0547 t = time.localtime() 0548 for proto in protocols: 0549 s = self.dumps(t, proto) 0550 u = self.loads(s) 0551 self.assertEqual(t, u) 0552 if hasattr(os, "stat"): 0553 t = os.stat(os.curdir) 0554 s = self.dumps(t, proto) 0555 u = self.loads(s) 0556 self.assertEqual(t, u) 0557 if hasattr(os, "statvfs"): 0558 t = os.statvfs(os.curdir) 0559 s = self.dumps(t, proto) 0560 u = self.loads(s) 0561 self.assertEqual(t, u) 0562 0563 # Tests for protocol 2 0564 0565 def test_proto(self): 0566 build_none = pickle.NONE + pickle.STOP 0567 for proto in protocols: 0568 expected = build_none 0569 if proto >= 2: 0570 expected = pickle.PROTO + chr(proto) + expected 0571 p = self.dumps(None, proto) 0572 self.assertEqual(p, expected) 0573 0574 oob = protocols[-1] + 1 # a future protocol 0575 badpickle = pickle.PROTO + chr(oob) + build_none 0576 try: 0577 self.loads(badpickle) 0578 except ValueError, detail: 0579 self.failUnless(str(detail).startswith( 0580 "unsupported pickle protocol")) 0581 else: 0582 self.fail("expected bad protocol number to raise ValueError") 0583 0584 def test_long1(self): 0585 x = 12345678910111213141516178920L 0586 for proto in protocols: 0587 s = self.dumps(x, proto) 0588 y = self.loads(s) 0589 self.assertEqual(x, y) 0590 self.assertEqual(opcode_in_pickle(pickle.LONG1, s), proto >= 2) 0591 0592 def test_long4(self): 0593 x = 12345678910111213141516178920L << (256*8) 0594 for proto in protocols: 0595 s = self.dumps(x, proto) 0596 y = self.loads(s) 0597 self.assertEqual(x, y) 0598 self.assertEqual(opcode_in_pickle(pickle.LONG4, s), proto >= 2) 0599 0600 def test_short_tuples(self): 0601 # Map (proto, len(tuple)) to expected opcode. 0602 expected_opcode = {(0, 0): pickle.TUPLE, 0603 (0, 1): pickle.TUPLE, 0604 (0, 2): pickle.TUPLE, 0605 (0, 3): pickle.TUPLE, 0606 (0, 4): pickle.TUPLE, 0607 0608 (1, 0): pickle.EMPTY_TUPLE, 0609 (1, 1): pickle.TUPLE, 0610 (1, 2): pickle.TUPLE, 0611 (1, 3): pickle.TUPLE, 0612 (1, 4): pickle.TUPLE, 0613 0614 (2, 0): pickle.EMPTY_TUPLE, 0615 (2, 1): pickle.TUPLE1, 0616 (2, 2): pickle.TUPLE2, 0617 (2, 3): pickle.TUPLE3, 0618 (2, 4): pickle.TUPLE, 0619 } 0620 a = () 0621 b = (1,) 0622 c = (1, 2) 0623 d = (1, 2, 3) 0624 e = (1, 2, 3, 4) 0625 for proto in protocols: 0626 for x in a, b, c, d, e: 0627 s = self.dumps(x, proto) 0628 y = self.loads(s) 0629 self.assertEqual(x, y, (proto, x, s, y)) 0630 expected = expected_opcode[proto, len(x)] 0631 self.assertEqual(opcode_in_pickle(expected, s), True) 0632 0633 def test_singletons(self): 0634 # Map (proto, singleton) to expected opcode. 0635 expected_opcode = {(0, None): pickle.NONE, 0636 (1, None): pickle.NONE, 0637 (2, None): pickle.NONE, 0638 0639 (0, True): pickle.INT, 0640 (1, True): pickle.INT, 0641 (2, True): pickle.NEWTRUE, 0642 0643 (0, False): pickle.INT, 0644 (1, False): pickle.INT, 0645 (2, False): pickle.NEWFALSE, 0646 } 0647 for proto in protocols: 0648 for x in None, False, True: 0649 s = self.dumps(x, proto) 0650 y = self.loads(s) 0651 self.assert_(x is y, (proto, x, s, y)) 0652 expected = expected_opcode[proto, x] 0653 self.assertEqual(opcode_in_pickle(expected, s), True) 0654 0655 def test_newobj_tuple(self): 0656 x = MyTuple([1, 2, 3]) 0657 x.foo = 42 0658 x.bar = "hello" 0659 for proto in protocols: 0660 s = self.dumps(x, proto) 0661 y = self.loads(s) 0662 self.assertEqual(tuple(x), tuple(y)) 0663 self.assertEqual(x.__dict__, y.__dict__) 0664 0665 def test_newobj_list(self): 0666 x = MyList([1, 2, 3]) 0667 x.foo = 42 0668 x.bar = "hello" 0669 for proto in protocols: 0670 s = self.dumps(x, proto) 0671 y = self.loads(s) 0672 self.assertEqual(list(x), list(y)) 0673 self.assertEqual(x.__dict__, y.__dict__) 0674 0675 def test_newobj_generic(self): 0676 for proto in protocols: 0677 for C in myclasses: 0678 B = C.__base__ 0679 x = C(C.sample) 0680 x.foo = 42 0681 s = self.dumps(x, proto) 0682 y = self.loads(s) 0683 detail = (proto, C, B, x, y, type(y)) 0684 self.assertEqual(B(x), B(y), detail) 0685 self.assertEqual(x.__dict__, y.__dict__, detail) 0686 0687 # Register a type with copy_reg, with extension code extcode. Pickle 0688 # an object of that type. Check that the resulting pickle uses opcode 0689 # (EXT[124]) under proto 2, and not in proto 1. 0690 0691 def produce_global_ext(self, extcode, opcode): 0692 e = ExtensionSaver(extcode) 0693 try: 0694 copy_reg.add_extension(__name__, "MyList", extcode) 0695 x = MyList([1, 2, 3]) 0696 x.foo = 42 0697 x.bar = "hello" 0698 0699 # Dump using protocol 1 for comparison. 0700 s1 = self.dumps(x, 1) 0701 self.assert_(__name__ in s1) 0702 self.assert_("MyList" in s1) 0703 self.assertEqual(opcode_in_pickle(opcode, s1), False) 0704 0705 y = self.loads(s1) 0706 self.assertEqual(list(x), list(y)) 0707 self.assertEqual(x.__dict__, y.__dict__) 0708 0709 # Dump using protocol 2 for test. 0710 s2 = self.dumps(x, 2) 0711 self.assert_(__name__ not in s2) 0712 self.assert_("MyList" not in s2) 0713 self.assertEqual(opcode_in_pickle(opcode, s2), True) 0714 0715 y = self.loads(s2) 0716 self.assertEqual(list(x), list(y)) 0717 self.assertEqual(x.__dict__, y.__dict__) 0718 0719 finally: 0720 e.restore() 0721 0722 def test_global_ext1(self): 0723 self.produce_global_ext(0x00000001, pickle.EXT1) # smallest EXT1 code 0724 self.produce_global_ext(0x000000ff, pickle.EXT1) # largest EXT1 code 0725 0726 def test_global_ext2(self): 0727 self.produce_global_ext(0x00000100, pickle.EXT2) # smallest EXT2 code 0728 self.produce_global_ext(0x0000ffff, pickle.EXT2) # largest EXT2 code 0729 self.produce_global_ext(0x0000abcd, pickle.EXT2) # check endianness 0730 0731 def test_global_ext4(self): 0732 self.produce_global_ext(0x00010000, pickle.EXT4) # smallest EXT4 code 0733 self.produce_global_ext(0x7fffffff, pickle.EXT4) # largest EXT4 code 0734 self.produce_global_ext(0x12abcdef, pickle.EXT4) # check endianness 0735 0736 def test_list_chunking(self): 0737 n = 10 # too small to chunk 0738 x = range(n) 0739 for proto in protocols: 0740 s = self.dumps(x, proto) 0741 y = self.loads(s) 0742 self.assertEqual(x, y) 0743 num_appends = count_opcode(pickle.APPENDS, s) 0744 self.assertEqual(num_appends, proto > 0) 0745 0746 n = 2500 # expect at least two chunks when proto > 0 0747 x = range(n) 0748 for proto in protocols: 0749 s = self.dumps(x, proto) 0750 y = self.loads(s) 0751 self.assertEqual(x, y) 0752 num_appends = count_opcode(pickle.APPENDS, s) 0753 if proto == 0: 0754 self.assertEqual(num_appends, 0) 0755 else: 0756 self.failUnless(num_appends >= 2) 0757 0758 def test_dict_chunking(self): 0759 n = 10 # too small to chunk 0760 x = dict.fromkeys(range(n)) 0761 for proto in protocols: 0762 s = self.dumps(x, proto) 0763 y = self.loads(s) 0764 self.assertEqual(x, y) 0765 num_setitems = count_opcode(pickle.SETITEMS, s) 0766 self.assertEqual(num_setitems, proto > 0) 0767 0768 n = 2500 # expect at least two chunks when proto > 0 0769 x = dict.fromkeys(range(n)) 0770 for proto in protocols: 0771 s = self.dumps(x, proto) 0772 y = self.loads(s) 0773 self.assertEqual(x, y) 0774 num_setitems = count_opcode(pickle.SETITEMS, s) 0775 if proto == 0: 0776 self.assertEqual(num_setitems, 0) 0777 else: 0778 self.failUnless(num_setitems >= 2) 0779 0780 def test_simple_newobj(self): 0781 x = object.__new__(SimpleNewObj) # avoid __init__ 0782 x.abc = 666 0783 for proto in protocols: 0784 s = self.dumps(x, proto) 0785 self.assertEqual(opcode_in_pickle(pickle.NEWOBJ, s), proto >= 2) 0786 y = self.loads(s) # will raise TypeError if __init__ called 0787 self.assertEqual(y.abc, 666) 0788 self.assertEqual(x.__dict__, y.__dict__) 0789 0790 def test_newobj_list_slots(self): 0791 x = SlotList([1, 2, 3]) 0792 x.foo = 42 0793 x.bar = "hello" 0794 s = self.dumps(x, 2) 0795 y = self.loads(s) 0796 self.assertEqual(list(x), list(y)) 0797 self.assertEqual(x.__dict__, y.__dict__) 0798 self.assertEqual(x.foo, y.foo) 0799 self.assertEqual(x.bar, y.bar) 0800 0801 def test_reduce_overrides_default_reduce_ex(self): 0802 for proto in 0, 1, 2: 0803 x = REX_one() 0804 self.assertEqual(x._reduce_called, 0) 0805 s = self.dumps(x, proto) 0806 self.assertEqual(x._reduce_called, 1) 0807 y = self.loads(s) 0808 self.assertEqual(y._reduce_called, 0) 0809 0810 def test_reduce_ex_called(self): 0811 for proto in 0, 1, 2: 0812 x = REX_two() 0813 self.assertEqual(x._proto, None) 0814 s = self.dumps(x, proto) 0815 self.assertEqual(x._proto, proto) 0816 y = self.loads(s) 0817 self.assertEqual(y._proto, None) 0818 0819 def test_reduce_ex_overrides_reduce(self): 0820 for proto in 0, 1, 2: 0821 x = REX_three() 0822 self.assertEqual(x._proto, None) 0823 s = self.dumps(x, proto) 0824 self.assertEqual(x._proto, proto) 0825 y = self.loads(s) 0826 self.assertEqual(y._proto, None) 0827 0828 # Test classes for reduce_ex 0829 0830 class REX_one(object): 0831 _reduce_called = 0 0832 def __reduce__(self): 0833 self._reduce_called = 1 0834 return REX_one, () 0835 # No __reduce_ex__ here, but inheriting it from object 0836 0837 class REX_two(object): 0838 _proto = None 0839 def __reduce_ex__(self, proto): 0840 self._proto = proto 0841 return REX_two, () 0842 # No __reduce__ here, but inheriting it from object 0843 0844 class REX_three(object): 0845 _proto = None 0846 def __reduce_ex__(self, proto): 0847 self._proto = proto 0848 return REX_two, () 0849 def __reduce__(self): 0850 raise TestFailed, "This __reduce__ shouldn't be called" 0851 0852 # Test classes for newobj 0853 0854 class MyInt(int): 0855 sample = 1 0856 0857 class MyLong(long): 0858 sample = 1L 0859 0860 class MyFloat(float): 0861 sample = 1.0 0862 0863 class MyComplex(complex): 0864 sample = 1.0 + 0.0j 0865 0866 class MyStr(str): 0867 sample = "hello" 0868 0869 class MyUnicode(unicode): 0870 sample = u"hello \u1234" 0871 0872 class MyTuple(tuple): 0873 sample = (1, 2, 3) 0874 0875 class MyList(list): 0876 sample = [1, 2, 3] 0877 0878 class MyDict(dict): 0879 sample = {"a": 1, "b": 2} 0880 0881 myclasses = [MyInt, MyLong, MyFloat, 0882 MyComplex, 0883 MyStr, MyUnicode, 0884 MyTuple, MyList, MyDict] 0885 0886 0887 class SlotList(MyList): 0888 __slots__ = ["foo"] 0889 0890 class SimpleNewObj(object): 0891 def __init__(self, a, b, c): 0892 # raise an error, to make sure this isn't called 0893 raise TypeError("SimpleNewObj.__init__() didn't expect to get called") 0894 0895 class AbstractPickleModuleTests(unittest.TestCase): 0896 0897 def test_dump_closed_file(self): 0898 import os 0899 f = open(TESTFN, "w") 0900 try: 0901 f.close() 0902 self.assertRaises(ValueError, self.module.dump, 123, f) 0903 finally: 0904 os.remove(TESTFN) 0905 0906 def test_load_closed_file(self): 0907 import os 0908 f = open(TESTFN, "w") 0909 try: 0910 f.close() 0911 self.assertRaises(ValueError, self.module.dump, 123, f) 0912 finally: 0913 os.remove(TESTFN) 0914 0915 def test_highest_protocol(self): 0916 # Of course this needs to be changed when HIGHEST_PROTOCOL changes. 0917 self.assertEqual(self.module.HIGHEST_PROTOCOL, 2) 0918 0919 def test_callapi(self): 0920 from cStringIO import StringIO 0921 f = StringIO() 0922 # With and without keyword arguments 0923 self.module.dump(123, f, -1) 0924 self.module.dump(123, file=f, protocol=-1) 0925 self.module.dumps(123, -1) 0926 self.module.dumps(123, protocol=-1) 0927 self.module.Pickler(f, -1) 0928 self.module.Pickler(f, protocol=-1) 0929 0930 class AbstractPersistentPicklerTests(unittest.TestCase): 0931 0932 # This class defines persistent_id() and persistent_load() 0933 # functions that should be used by the pickler. All even integers 0934 # are pickled using persistent ids. 0935 0936 def persistent_id(self, object): 0937 if isinstance(object, int) and object % 2 == 0: 0938 self.id_count += 1 0939 return str(object) 0940 else: 0941 return None 0942 0943 def persistent_load(self, oid): 0944 self.load_count += 1 0945 object = int(oid) 0946 assert object % 2 == 0 0947 return object 0948 0949 def test_persistence(self): 0950 self.id_count = 0 0951 self.load_count = 0 0952 L = range(10) 0953 self.assertEqual(self.loads(self.dumps(L)), L) 0954 self.assertEqual(self.id_count, 5) 0955 self.assertEqual(self.load_count, 5) 0956 0957 def test_bin_persistence(self): 0958 self.id_count = 0 0959 self.load_count = 0 0960 L = range(10) 0961 self.assertEqual(self.loads(self.dumps(L, 1)), L) 0962 self.assertEqual(self.id_count, 5) 0963 self.assertEqual(self.load_count, 5) 0964
Generated by PyXR 0.9.4