0001 import gc 0002 import sys 0003 import unittest 0004 import UserList 0005 import weakref 0006 0007 from test import test_support 0008 0009 0010 class C: 0011 def method(self): 0012 pass 0013 0014 0015 class Callable: 0016 bar = None 0017 0018 def __call__(self, x): 0019 self.bar = x 0020 0021 0022 def create_function(): 0023 def f(): pass 0024 return f 0025 0026 def create_bound_method(): 0027 return C().method 0028 0029 def create_unbound_method(): 0030 return C.method 0031 0032 0033 class TestBase(unittest.TestCase): 0034 0035 def setUp(self): 0036 self.cbcalled = 0 0037 0038 def callback(self, ref): 0039 self.cbcalled += 1 0040 0041 0042 class ReferencesTestCase(TestBase): 0043 0044 def test_basic_ref(self): 0045 self.check_basic_ref(C) 0046 self.check_basic_ref(create_function) 0047 self.check_basic_ref(create_bound_method) 0048 self.check_basic_ref(create_unbound_method) 0049 0050 # Just make sure the tp_repr handler doesn't raise an exception. 0051 # Live reference: 0052 o = C() 0053 wr = weakref.ref(o) 0054 `wr` 0055 # Dead reference: 0056 del o 0057 `wr` 0058 0059 def test_basic_callback(self): 0060 self.check_basic_callback(C) 0061 self.check_basic_callback(create_function) 0062 self.check_basic_callback(create_bound_method) 0063 self.check_basic_callback(create_unbound_method) 0064 0065 def test_multiple_callbacks(self): 0066 o = C() 0067 ref1 = weakref.ref(o, self.callback) 0068 ref2 = weakref.ref(o, self.callback) 0069 del o 0070 self.assert_(ref1() is None, 0071 "expected reference to be invalidated") 0072 self.assert_(ref2() is None, 0073 "expected reference to be invalidated") 0074 self.assert_(self.cbcalled == 2, 0075 "callback not called the right number of times") 0076 0077 def test_multiple_selfref_callbacks(self): 0078 # Make sure all references are invalidated before callbacks are called 0079 # 0080 # What's important here is that we're using the first 0081 # reference in the callback invoked on the second reference 0082 # (the most recently created ref is cleaned up first). This 0083 # tests that all references to the object are invalidated 0084 # before any of the callbacks are invoked, so that we only 0085 # have one invocation of _weakref.c:cleanup_helper() active 0086 # for a particular object at a time. 0087 # 0088 def callback(object, self=self): 0089 self.ref() 0090 c = C() 0091 self.ref = weakref.ref(c, callback) 0092 ref1 = weakref.ref(c, callback) 0093 del c 0094 0095 def test_proxy_ref(self): 0096 o = C() 0097 o.bar = 1 0098 ref1 = weakref.proxy(o, self.callback) 0099 ref2 = weakref.proxy(o, self.callback) 0100 del o 0101 0102 def check(proxy): 0103 proxy.bar 0104 0105 self.assertRaises(weakref.ReferenceError, check, ref1) 0106 self.assertRaises(weakref.ReferenceError, check, ref2) 0107 self.assertRaises(weakref.ReferenceError, bool, weakref.proxy(C())) 0108 self.assert_(self.cbcalled == 2) 0109 0110 def check_basic_ref(self, factory): 0111 o = factory() 0112 ref = weakref.ref(o) 0113 self.assert_(ref() is not None, 0114 "weak reference to live object should be live") 0115 o2 = ref() 0116 self.assert_(o is o2, 0117 "<ref>() should return original object if live") 0118 0119 def check_basic_callback(self, factory): 0120 self.cbcalled = 0 0121 o = factory() 0122 ref = weakref.ref(o, self.callback) 0123 del o 0124 self.assert_(self.cbcalled == 1, 0125 "callback did not properly set 'cbcalled'") 0126 self.assert_(ref() is None, 0127 "ref2 should be dead after deleting object reference") 0128 0129 def test_ref_reuse(self): 0130 o = C() 0131 ref1 = weakref.ref(o) 0132 # create a proxy to make sure that there's an intervening creation 0133 # between these two; it should make no difference 0134 proxy = weakref.proxy(o) 0135 ref2 = weakref.ref(o) 0136 self.assert_(ref1 is ref2, 0137 "reference object w/out callback should be re-used") 0138 0139 o = C() 0140 proxy = weakref.proxy(o) 0141 ref1 = weakref.ref(o) 0142 ref2 = weakref.ref(o) 0143 self.assert_(ref1 is ref2, 0144 "reference object w/out callback should be re-used") 0145 self.assert_(weakref.getweakrefcount(o) == 2, 0146 "wrong weak ref count for object") 0147 del proxy 0148 self.assert_(weakref.getweakrefcount(o) == 1, 0149 "wrong weak ref count for object after deleting proxy") 0150 0151 def test_proxy_reuse(self): 0152 o = C() 0153 proxy1 = weakref.proxy(o) 0154 ref = weakref.ref(o) 0155 proxy2 = weakref.proxy(o) 0156 self.assert_(proxy1 is proxy2, 0157 "proxy object w/out callback should have been re-used") 0158 0159 def test_basic_proxy(self): 0160 o = C() 0161 self.check_proxy(o, weakref.proxy(o)) 0162 0163 L = UserList.UserList() 0164 p = weakref.proxy(L) 0165 self.failIf(p, "proxy for empty UserList should be false") 0166 p.append(12) 0167 self.assertEqual(len(L), 1) 0168 self.failUnless(p, "proxy for non-empty UserList should be true") 0169 p[:] = [2, 3] 0170 self.assertEqual(len(L), 2) 0171 self.assertEqual(len(p), 2) 0172 self.failUnless(3 in p, 0173 "proxy didn't support __contains__() properly") 0174 p[1] = 5 0175 self.assertEqual(L[1], 5) 0176 self.assertEqual(p[1], 5) 0177 L2 = UserList.UserList(L) 0178 p2 = weakref.proxy(L2) 0179 self.assertEqual(p, p2) 0180 ## self.assertEqual(repr(L2), repr(p2)) 0181 L3 = UserList.UserList(range(10)) 0182 p3 = weakref.proxy(L3) 0183 self.assertEqual(L3[:], p3[:]) 0184 self.assertEqual(L3[5:], p3[5:]) 0185 self.assertEqual(L3[:5], p3[:5]) 0186 self.assertEqual(L3[2:5], p3[2:5]) 0187 0188 # The PyWeakref_* C API is documented as allowing either NULL or 0189 # None as the value for the callback, where either means "no 0190 # callback". The "no callback" ref and proxy objects are supposed 0191 # to be shared so long as they exist by all callers so long as 0192 # they are active. In Python 2.3.3 and earlier, this guaranttee 0193 # was not honored, and was broken in different ways for 0194 # PyWeakref_NewRef() and PyWeakref_NewProxy(). (Two tests.) 0195 0196 def test_shared_ref_without_callback(self): 0197 self.check_shared_without_callback(weakref.ref) 0198 0199 def test_shared_proxy_without_callback(self): 0200 self.check_shared_without_callback(weakref.proxy) 0201 0202 def check_shared_without_callback(self, makeref): 0203 o = Object(1) 0204 p1 = makeref(o, None) 0205 p2 = makeref(o, None) 0206 self.assert_(p1 is p2, "both callbacks were None in the C API") 0207 del p1, p2 0208 p1 = makeref(o) 0209 p2 = makeref(o, None) 0210 self.assert_(p1 is p2, "callbacks were NULL, None in the C API") 0211 del p1, p2 0212 p1 = makeref(o) 0213 p2 = makeref(o) 0214 self.assert_(p1 is p2, "both callbacks were NULL in the C API") 0215 del p1, p2 0216 p1 = makeref(o, None) 0217 p2 = makeref(o) 0218 self.assert_(p1 is p2, "callbacks were None, NULL in the C API") 0219 0220 def test_callable_proxy(self): 0221 o = Callable() 0222 ref1 = weakref.proxy(o) 0223 0224 self.check_proxy(o, ref1) 0225 0226 self.assert_(type(ref1) is weakref.CallableProxyType, 0227 "proxy is not of callable type") 0228 ref1('twinkies!') 0229 self.assert_(o.bar == 'twinkies!', 0230 "call through proxy not passed through to original") 0231 ref1(x='Splat.') 0232 self.assert_(o.bar == 'Splat.', 0233 "call through proxy not passed through to original") 0234 0235 # expect due to too few args 0236 self.assertRaises(TypeError, ref1) 0237 0238 # expect due to too many args 0239 self.assertRaises(TypeError, ref1, 1, 2, 3) 0240 0241 def check_proxy(self, o, proxy): 0242 o.foo = 1 0243 self.assert_(proxy.foo == 1, 0244 "proxy does not reflect attribute addition") 0245 o.foo = 2 0246 self.assert_(proxy.foo == 2, 0247 "proxy does not reflect attribute modification") 0248 del o.foo 0249 self.assert_(not hasattr(proxy, 'foo'), 0250 "proxy does not reflect attribute removal") 0251 0252 proxy.foo = 1 0253 self.assert_(o.foo == 1, 0254 "object does not reflect attribute addition via proxy") 0255 proxy.foo = 2 0256 self.assert_( 0257 o.foo == 2, 0258 "object does not reflect attribute modification via proxy") 0259 del proxy.foo 0260 self.assert_(not hasattr(o, 'foo'), 0261 "object does not reflect attribute removal via proxy") 0262 0263 def test_proxy_deletion(self): 0264 # Test clearing of SF bug #762891 0265 class Foo: 0266 result = None 0267 def __delitem__(self, accessor): 0268 self.result = accessor 0269 g = Foo() 0270 f = weakref.proxy(g) 0271 del f[0] 0272 self.assertEqual(f.result, 0) 0273 0274 def test_getweakrefcount(self): 0275 o = C() 0276 ref1 = weakref.ref(o) 0277 ref2 = weakref.ref(o, self.callback) 0278 self.assert_(weakref.getweakrefcount(o) == 2, 0279 "got wrong number of weak reference objects") 0280 0281 proxy1 = weakref.proxy(o) 0282 proxy2 = weakref.proxy(o, self.callback) 0283 self.assert_(weakref.getweakrefcount(o) == 4, 0284 "got wrong number of weak reference objects") 0285 0286 del ref1, ref2, proxy1, proxy2 0287 self.assert_(weakref.getweakrefcount(o) == 0, 0288 "weak reference objects not unlinked from" 0289 " referent when discarded.") 0290 0291 # assumes ints do not support weakrefs 0292 self.assert_(weakref.getweakrefcount(1) == 0, 0293 "got wrong number of weak reference objects for int") 0294 0295 def test_getweakrefs(self): 0296 o = C() 0297 ref1 = weakref.ref(o, self.callback) 0298 ref2 = weakref.ref(o, self.callback) 0299 del ref1 0300 self.assert_(weakref.getweakrefs(o) == [ref2], 0301 "list of refs does not match") 0302 0303 o = C() 0304 ref1 = weakref.ref(o, self.callback) 0305 ref2 = weakref.ref(o, self.callback) 0306 del ref2 0307 self.assert_(weakref.getweakrefs(o) == [ref1], 0308 "list of refs does not match") 0309 0310 del ref1 0311 self.assert_(weakref.getweakrefs(o) == [], 0312 "list of refs not cleared") 0313 0314 # assumes ints do not support weakrefs 0315 self.assert_(weakref.getweakrefs(1) == [], 0316 "list of refs does not match for int") 0317 0318 def test_newstyle_number_ops(self): 0319 class F(float): 0320 pass 0321 f = F(2.0) 0322 p = weakref.proxy(f) 0323 self.assert_(p + 1.0 == 3.0) 0324 self.assert_(1.0 + p == 3.0) # this used to SEGV 0325 0326 def test_callbacks_protected(self): 0327 # Callbacks protected from already-set exceptions? 0328 # Regression test for SF bug #478534. 0329 class BogusError(Exception): 0330 pass 0331 data = {} 0332 def remove(k): 0333 del data[k] 0334 def encapsulate(): 0335 f = lambda : () 0336 data[weakref.ref(f, remove)] = None 0337 raise BogusError 0338 try: 0339 encapsulate() 0340 except BogusError: 0341 pass 0342 else: 0343 self.fail("exception not properly restored") 0344 try: 0345 encapsulate() 0346 except BogusError: 0347 pass 0348 else: 0349 self.fail("exception not properly restored") 0350 0351 def test_sf_bug_840829(self): 0352 # "weakref callbacks and gc corrupt memory" 0353 # subtype_dealloc erroneously exposed a new-style instance 0354 # already in the process of getting deallocated to gc, 0355 # causing double-deallocation if the instance had a weakref 0356 # callback that triggered gc. 0357 # If the bug exists, there probably won't be an obvious symptom 0358 # in a release build. In a debug build, a segfault will occur 0359 # when the second attempt to remove the instance from the "list 0360 # of all objects" occurs. 0361 0362 import gc 0363 0364 class C(object): 0365 pass 0366 0367 c = C() 0368 wr = weakref.ref(c, lambda ignore: gc.collect()) 0369 del c 0370 0371 # There endeth the first part. It gets worse. 0372 del wr 0373 0374 c1 = C() 0375 c1.i = C() 0376 wr = weakref.ref(c1.i, lambda ignore: gc.collect()) 0377 0378 c2 = C() 0379 c2.c1 = c1 0380 del c1 # still alive because c2 points to it 0381 0382 # Now when subtype_dealloc gets called on c2, it's not enough just 0383 # that c2 is immune from gc while the weakref callbacks associated 0384 # with c2 execute (there are none in this 2nd half of the test, btw). 0385 # subtype_dealloc goes on to call the base classes' deallocs too, 0386 # so any gc triggered by weakref callbacks associated with anything 0387 # torn down by a base class dealloc can also trigger double 0388 # deallocation of c2. 0389 del c2 0390 0391 def test_callback_in_cycle_1(self): 0392 import gc 0393 0394 class J(object): 0395 pass 0396 0397 class II(object): 0398 def acallback(self, ignore): 0399 self.J 0400 0401 I = II() 0402 I.J = J 0403 I.wr = weakref.ref(J, I.acallback) 0404 0405 # Now J and II are each in a self-cycle (as all new-style class 0406 # objects are, since their __mro__ points back to them). I holds 0407 # both a weak reference (I.wr) and a strong reference (I.J) to class 0408 # J. I is also in a cycle (I.wr points to a weakref that references 0409 # I.acallback). When we del these three, they all become trash, but 0410 # the cycles prevent any of them from getting cleaned up immediately. 0411 # Instead they have to wait for cyclic gc to deduce that they're 0412 # trash. 0413 # 0414 # gc used to call tp_clear on all of them, and the order in which 0415 # it does that is pretty accidental. The exact order in which we 0416 # built up these things manages to provoke gc into running tp_clear 0417 # in just the right order (I last). Calling tp_clear on II leaves 0418 # behind an insane class object (its __mro__ becomes NULL). Calling 0419 # tp_clear on J breaks its self-cycle, but J doesn't get deleted 0420 # just then because of the strong reference from I.J. Calling 0421 # tp_clear on I starts to clear I's __dict__, and just happens to 0422 # clear I.J first -- I.wr is still intact. That removes the last 0423 # reference to J, which triggers the weakref callback. The callback 0424 # tries to do "self.J", and instances of new-style classes look up 0425 # attributes ("J") in the class dict first. The class (II) wants to 0426 # search II.__mro__, but that's NULL. The result was a segfault in 0427 # a release build, and an assert failure in a debug build. 0428 del I, J, II 0429 gc.collect() 0430 0431 def test_callback_in_cycle_2(self): 0432 import gc 0433 0434 # This is just like test_callback_in_cycle_1, except that II is an 0435 # old-style class. The symptom is different then: an instance of an 0436 # old-style class looks in its own __dict__ first. 'J' happens to 0437 # get cleared from I.__dict__ before 'wr', and 'J' was never in II's 0438 # __dict__, so the attribute isn't found. The difference is that 0439 # the old-style II doesn't have a NULL __mro__ (it doesn't have any 0440 # __mro__), so no segfault occurs. Instead it got: 0441 # test_callback_in_cycle_2 (__main__.ReferencesTestCase) ... 0442 # Exception exceptions.AttributeError: 0443 # "II instance has no attribute 'J'" in <bound method II.acallback 0444 # of <?.II instance at 0x00B9B4B8>> ignored 0445 0446 class J(object): 0447 pass 0448 0449 class II: 0450 def acallback(self, ignore): 0451 self.J 0452 0453 I = II() 0454 I.J = J 0455 I.wr = weakref.ref(J, I.acallback) 0456 0457 del I, J, II 0458 gc.collect() 0459 0460 def test_callback_in_cycle_3(self): 0461 import gc 0462 0463 # This one broke the first patch that fixed the last two. In this 0464 # case, the objects reachable from the callback aren't also reachable 0465 # from the object (c1) *triggering* the callback: you can get to 0466 # c1 from c2, but not vice-versa. The result was that c2's __dict__ 0467 # got tp_clear'ed by the time the c2.cb callback got invoked. 0468 0469 class C: 0470 def cb(self, ignore): 0471 self.me 0472 self.c1 0473 self.wr 0474 0475 c1, c2 = C(), C() 0476 0477 c2.me = c2 0478 c2.c1 = c1 0479 c2.wr = weakref.ref(c1, c2.cb) 0480 0481 del c1, c2 0482 gc.collect() 0483 0484 def test_callback_in_cycle_4(self): 0485 import gc 0486 0487 # Like test_callback_in_cycle_3, except c2 and c1 have different 0488 # classes. c2's class (C) isn't reachable from c1 then, so protecting 0489 # objects reachable from the dying object (c1) isn't enough to stop 0490 # c2's class (C) from getting tp_clear'ed before c2.cb is invoked. 0491 # The result was a segfault (C.__mro__ was NULL when the callback 0492 # tried to look up self.me). 0493 0494 class C(object): 0495 def cb(self, ignore): 0496 self.me 0497 self.c1 0498 self.wr 0499 0500 class D: 0501 pass 0502 0503 c1, c2 = D(), C() 0504 0505 c2.me = c2 0506 c2.c1 = c1 0507 c2.wr = weakref.ref(c1, c2.cb) 0508 0509 del c1, c2, C, D 0510 gc.collect() 0511 0512 def test_callback_in_cycle_resurrection(self): 0513 import gc 0514 0515 # Do something nasty in a weakref callback: resurrect objects 0516 # from dead cycles. For this to be attempted, the weakref and 0517 # its callback must also be part of the cyclic trash (else the 0518 # objects reachable via the callback couldn't be in cyclic trash 0519 # to begin with -- the callback would act like an external root). 0520 # But gc clears trash weakrefs with callbacks early now, which 0521 # disables the callbacks, so the callbacks shouldn't get called 0522 # at all (and so nothing actually gets resurrected). 0523 0524 alist = [] 0525 class C(object): 0526 def __init__(self, value): 0527 self.attribute = value 0528 0529 def acallback(self, ignore): 0530 alist.append(self.c) 0531 0532 c1, c2 = C(1), C(2) 0533 c1.c = c2 0534 c2.c = c1 0535 c1.wr = weakref.ref(c2, c1.acallback) 0536 c2.wr = weakref.ref(c1, c2.acallback) 0537 0538 def C_went_away(ignore): 0539 alist.append("C went away") 0540 wr = weakref.ref(C, C_went_away) 0541 0542 del c1, c2, C # make them all trash 0543 self.assertEqual(alist, []) # del isn't enough to reclaim anything 0544 0545 gc.collect() 0546 # c1.wr and c2.wr were part of the cyclic trash, so should have 0547 # been cleared without their callbacks executing. OTOH, the weakref 0548 # to C is bound to a function local (wr), and wasn't trash, so that 0549 # callback should have been invoked when C went away. 0550 self.assertEqual(alist, ["C went away"]) 0551 # The remaining weakref should be dead now (its callback ran). 0552 self.assertEqual(wr(), None) 0553 0554 del alist[:] 0555 gc.collect() 0556 self.assertEqual(alist, []) 0557 0558 def test_callbacks_on_callback(self): 0559 import gc 0560 0561 # Set up weakref callbacks *on* weakref callbacks. 0562 alist = [] 0563 def safe_callback(ignore): 0564 alist.append("safe_callback called") 0565 0566 class C(object): 0567 def cb(self, ignore): 0568 alist.append("cb called") 0569 0570 c, d = C(), C() 0571 c.other = d 0572 d.other = c 0573 callback = c.cb 0574 c.wr = weakref.ref(d, callback) # this won't trigger 0575 d.wr = weakref.ref(callback, d.cb) # ditto 0576 external_wr = weakref.ref(callback, safe_callback) # but this will 0577 self.assert_(external_wr() is callback) 0578 0579 # The weakrefs attached to c and d should get cleared, so that 0580 # C.cb is never called. But external_wr isn't part of the cyclic 0581 # trash, and no cyclic trash is reachable from it, so safe_callback 0582 # should get invoked when the bound method object callback (c.cb) 0583 # -- which is itself a callback, and also part of the cyclic trash -- 0584 # gets reclaimed at the end of gc. 0585 0586 del callback, c, d, C 0587 self.assertEqual(alist, []) # del isn't enough to clean up cycles 0588 gc.collect() 0589 self.assertEqual(alist, ["safe_callback called"]) 0590 self.assertEqual(external_wr(), None) 0591 0592 del alist[:] 0593 gc.collect() 0594 self.assertEqual(alist, []) 0595 0596 def test_gc_during_ref_creation(self): 0597 self.check_gc_during_creation(weakref.ref) 0598 0599 def test_gc_during_proxy_creation(self): 0600 self.check_gc_during_creation(weakref.proxy) 0601 0602 def check_gc_during_creation(self, makeref): 0603 thresholds = gc.get_threshold() 0604 gc.set_threshold(1, 1, 1) 0605 gc.collect() 0606 class A: 0607 pass 0608 0609 def callback(*args): 0610 pass 0611 0612 referenced = A() 0613 0614 a = A() 0615 a.a = a 0616 a.wr = makeref(referenced) 0617 0618 try: 0619 # now make sure the object and the ref get labeled as 0620 # cyclic trash: 0621 a = A() 0622 weakref.ref(referenced, callback) 0623 0624 finally: 0625 gc.set_threshold(*thresholds) 0626 0627 0628 class SubclassableWeakrefTestCase(unittest.TestCase): 0629 0630 def test_subclass_refs(self): 0631 class MyRef(weakref.ref): 0632 def __init__(self, ob, callback=None, value=42): 0633 self.value = value 0634 super(MyRef, self).__init__(ob, callback) 0635 def __call__(self): 0636 self.called = True 0637 return super(MyRef, self).__call__() 0638 o = Object("foo") 0639 mr = MyRef(o, value=24) 0640 self.assert_(mr() is o) 0641 self.assert_(mr.called) 0642 self.assertEqual(mr.value, 24) 0643 del o 0644 self.assert_(mr() is None) 0645 self.assert_(mr.called) 0646 0647 def test_subclass_refs_dont_replace_standard_refs(self): 0648 class MyRef(weakref.ref): 0649 pass 0650 o = Object(42) 0651 r1 = MyRef(o) 0652 r2 = weakref.ref(o) 0653 self.assert_(r1 is not r2) 0654 self.assertEqual(weakref.getweakrefs(o), [r2, r1]) 0655 self.assertEqual(weakref.getweakrefcount(o), 2) 0656 r3 = MyRef(o) 0657 self.assertEqual(weakref.getweakrefcount(o), 3) 0658 refs = weakref.getweakrefs(o) 0659 self.assertEqual(len(refs), 3) 0660 self.assert_(r2 is refs[0]) 0661 self.assert_(r1 in refs[1:]) 0662 self.assert_(r3 in refs[1:]) 0663 0664 def test_subclass_refs_dont_conflate_callbacks(self): 0665 class MyRef(weakref.ref): 0666 pass 0667 o = Object(42) 0668 r1 = MyRef(o, id) 0669 r2 = MyRef(o, str) 0670 self.assert_(r1 is not r2) 0671 refs = weakref.getweakrefs(o) 0672 self.assert_(r1 in refs) 0673 self.assert_(r2 in refs) 0674 0675 def test_subclass_refs_with_slots(self): 0676 class MyRef(weakref.ref): 0677 __slots__ = "slot1", "slot2" 0678 def __new__(type, ob, callback, slot1, slot2): 0679 return weakref.ref.__new__(type, ob, callback) 0680 def __init__(self, ob, callback, slot1, slot2): 0681 self.slot1 = slot1 0682 self.slot2 = slot2 0683 def meth(self): 0684 return self.slot1 + self.slot2 0685 o = Object(42) 0686 r = MyRef(o, None, "abc", "def") 0687 self.assertEqual(r.slot1, "abc") 0688 self.assertEqual(r.slot2, "def") 0689 self.assertEqual(r.meth(), "abcdef") 0690 self.failIf(hasattr(r, "__dict__")) 0691 0692 0693 class Object: 0694 def __init__(self, arg): 0695 self.arg = arg 0696 def __repr__(self): 0697 return "<Object %r>" % self.arg 0698 0699 0700 class MappingTestCase(TestBase): 0701 0702 COUNT = 10 0703 0704 def test_weak_values(self): 0705 # 0706 # This exercises d.copy(), d.items(), d[], del d[], len(d). 0707 # 0708 dict, objects = self.make_weak_valued_dict() 0709 for o in objects: 0710 self.assert_(weakref.getweakrefcount(o) == 1, 0711 "wrong number of weak references to %r!" % o) 0712 self.assert_(o is dict[o.arg], 0713 "wrong object returned by weak dict!") 0714 items1 = dict.items() 0715 items2 = dict.copy().items() 0716 items1.sort() 0717 items2.sort() 0718 self.assert_(items1 == items2, 0719 "cloning of weak-valued dictionary did not work!") 0720 del items1, items2 0721 self.assert_(len(dict) == self.COUNT) 0722 del objects[0] 0723 self.assert_(len(dict) == (self.COUNT - 1), 0724 "deleting object did not cause dictionary update") 0725 del objects, o 0726 self.assert_(len(dict) == 0, 0727 "deleting the values did not clear the dictionary") 0728 # regression on SF bug #447152: 0729 dict = weakref.WeakValueDictionary() 0730 self.assertRaises(KeyError, dict.__getitem__, 1) 0731 dict[2] = C() 0732 self.assertRaises(KeyError, dict.__getitem__, 2) 0733 0734 def test_weak_keys(self): 0735 # 0736 # This exercises d.copy(), d.items(), d[] = v, d[], del d[], 0737 # len(d), d.has_key(). 0738 # 0739 dict, objects = self.make_weak_keyed_dict() 0740 for o in objects: 0741 self.assert_(weakref.getweakrefcount(o) == 1, 0742 "wrong number of weak references to %r!" % o) 0743 self.assert_(o.arg is dict[o], 0744 "wrong object returned by weak dict!") 0745 items1 = dict.items() 0746 items2 = dict.copy().items() 0747 self.assert_(set(items1) == set(items2), 0748 "cloning of weak-keyed dictionary did not work!") 0749 del items1, items2 0750 self.assert_(len(dict) == self.COUNT) 0751 del objects[0] 0752 self.assert_(len(dict) == (self.COUNT - 1), 0753 "deleting object did not cause dictionary update") 0754 del objects, o 0755 self.assert_(len(dict) == 0, 0756 "deleting the keys did not clear the dictionary") 0757 o = Object(42) 0758 dict[o] = "What is the meaning of the universe?" 0759 self.assert_(dict.has_key(o)) 0760 self.assert_(not dict.has_key(34)) 0761 0762 def test_weak_keyed_iters(self): 0763 dict, objects = self.make_weak_keyed_dict() 0764 self.check_iters(dict) 0765 0766 def test_weak_valued_iters(self): 0767 dict, objects = self.make_weak_valued_dict() 0768 self.check_iters(dict) 0769 0770 def check_iters(self, dict): 0771 # item iterator: 0772 items = dict.items() 0773 for item in dict.iteritems(): 0774 items.remove(item) 0775 self.assert_(len(items) == 0, "iteritems() did not touch all items") 0776 0777 # key iterator, via __iter__(): 0778 keys = dict.keys() 0779 for k in dict: 0780 keys.remove(k) 0781 self.assert_(len(keys) == 0, "__iter__() did not touch all keys") 0782 0783 # key iterator, via iterkeys(): 0784 keys = dict.keys() 0785 for k in dict.iterkeys(): 0786 keys.remove(k) 0787 self.assert_(len(keys) == 0, "iterkeys() did not touch all keys") 0788 0789 # value iterator: 0790 values = dict.values() 0791 for v in dict.itervalues(): 0792 values.remove(v) 0793 self.assert_(len(values) == 0, 0794 "itervalues() did not touch all values") 0795 0796 def test_make_weak_keyed_dict_from_dict(self): 0797 o = Object(3) 0798 dict = weakref.WeakKeyDictionary({o:364}) 0799 self.assert_(dict[o] == 364) 0800 0801 def test_make_weak_keyed_dict_from_weak_keyed_dict(self): 0802 o = Object(3) 0803 dict = weakref.WeakKeyDictionary({o:364}) 0804 dict2 = weakref.WeakKeyDictionary(dict) 0805 self.assert_(dict[o] == 364) 0806 0807 def make_weak_keyed_dict(self): 0808 dict = weakref.WeakKeyDictionary() 0809 objects = map(Object, range(self.COUNT)) 0810 for o in objects: 0811 dict[o] = o.arg 0812 return dict, objects 0813 0814 def make_weak_valued_dict(self): 0815 dict = weakref.WeakValueDictionary() 0816 objects = map(Object, range(self.COUNT)) 0817 for o in objects: 0818 dict[o.arg] = o 0819 return dict, objects 0820 0821 def check_popitem(self, klass, key1, value1, key2, value2): 0822 weakdict = klass() 0823 weakdict[key1] = value1 0824 weakdict[key2] = value2 0825 self.assert_(len(weakdict) == 2) 0826 k, v = weakdict.popitem() 0827 self.assert_(len(weakdict) == 1) 0828 if k is key1: 0829 self.assert_(v is value1) 0830 else: 0831 self.assert_(v is value2) 0832 k, v = weakdict.popitem() 0833 self.assert_(len(weakdict) == 0) 0834 if k is key1: 0835 self.assert_(v is value1) 0836 else: 0837 self.assert_(v is value2) 0838 0839 def test_weak_valued_dict_popitem(self): 0840 self.check_popitem(weakref.WeakValueDictionary, 0841 "key1", C(), "key2", C()) 0842 0843 def test_weak_keyed_dict_popitem(self): 0844 self.check_popitem(weakref.WeakKeyDictionary, 0845 C(), "value 1", C(), "value 2") 0846 0847 def check_setdefault(self, klass, key, value1, value2): 0848 self.assert_(value1 is not value2, 0849 "invalid test" 0850 " -- value parameters must be distinct objects") 0851 weakdict = klass() 0852 o = weakdict.setdefault(key, value1) 0853 self.assert_(o is value1) 0854 self.assert_(weakdict.has_key(key)) 0855 self.assert_(weakdict.get(key) is value1) 0856 self.assert_(weakdict[key] is value1) 0857 0858 o = weakdict.setdefault(key, value2) 0859 self.assert_(o is value1) 0860 self.assert_(weakdict.has_key(key)) 0861 self.assert_(weakdict.get(key) is value1) 0862 self.assert_(weakdict[key] is value1) 0863 0864 def test_weak_valued_dict_setdefault(self): 0865 self.check_setdefault(weakref.WeakValueDictionary, 0866 "key", C(), C()) 0867 0868 def test_weak_keyed_dict_setdefault(self): 0869 self.check_setdefault(weakref.WeakKeyDictionary, 0870 C(), "value 1", "value 2") 0871 0872 def check_update(self, klass, dict): 0873 # 0874 # This exercises d.update(), len(d), d.keys(), d.has_key(), 0875 # d.get(), d[]. 0876 # 0877 weakdict = klass() 0878 weakdict.update(dict) 0879 self.assert_(len(weakdict) == len(dict)) 0880 for k in weakdict.keys(): 0881 self.assert_(dict.has_key(k), 0882 "mysterious new key appeared in weak dict") 0883 v = dict.get(k) 0884 self.assert_(v is weakdict[k]) 0885 self.assert_(v is weakdict.get(k)) 0886 for k in dict.keys(): 0887 self.assert_(weakdict.has_key(k), 0888 "original key disappeared in weak dict") 0889 v = dict[k] 0890 self.assert_(v is weakdict[k]) 0891 self.assert_(v is weakdict.get(k)) 0892 0893 def test_weak_valued_dict_update(self): 0894 self.check_update(weakref.WeakValueDictionary, 0895 {1: C(), 'a': C(), C(): C()}) 0896 0897 def test_weak_keyed_dict_update(self): 0898 self.check_update(weakref.WeakKeyDictionary, 0899 {C(): 1, C(): 2, C(): 3}) 0900 0901 def test_weak_keyed_delitem(self): 0902 d = weakref.WeakKeyDictionary() 0903 o1 = Object('1') 0904 o2 = Object('2') 0905 d[o1] = 'something' 0906 d[o2] = 'something' 0907 self.assert_(len(d) == 2) 0908 del d[o1] 0909 self.assert_(len(d) == 1) 0910 self.assert_(d.keys() == [o2]) 0911 0912 def test_weak_valued_delitem(self): 0913 d = weakref.WeakValueDictionary() 0914 o1 = Object('1') 0915 o2 = Object('2') 0916 d['something'] = o1 0917 d['something else'] = o2 0918 self.assert_(len(d) == 2) 0919 del d['something'] 0920 self.assert_(len(d) == 1) 0921 self.assert_(d.items() == [('something else', o2)]) 0922 0923 def test_weak_keyed_bad_delitem(self): 0924 d = weakref.WeakKeyDictionary() 0925 o = Object('1') 0926 # An attempt to delete an object that isn't there should raise 0927 # KeyError. It didn't before 2.3. 0928 self.assertRaises(KeyError, d.__delitem__, o) 0929 self.assertRaises(KeyError, d.__getitem__, o) 0930 0931 # If a key isn't of a weakly referencable type, __getitem__ and 0932 # __setitem__ raise TypeError. __delitem__ should too. 0933 self.assertRaises(TypeError, d.__delitem__, 13) 0934 self.assertRaises(TypeError, d.__getitem__, 13) 0935 self.assertRaises(TypeError, d.__setitem__, 13, 13) 0936 0937 def test_weak_keyed_cascading_deletes(self): 0938 # SF bug 742860. For some reason, before 2.3 __delitem__ iterated 0939 # over the keys via self.data.iterkeys(). If things vanished from 0940 # the dict during this (or got added), that caused a RuntimeError. 0941 0942 d = weakref.WeakKeyDictionary() 0943 mutate = False 0944 0945 class C(object): 0946 def __init__(self, i): 0947 self.value = i 0948 def __hash__(self): 0949 return hash(self.value) 0950 def __eq__(self, other): 0951 if mutate: 0952 # Side effect that mutates the dict, by removing the 0953 # last strong reference to a key. 0954 del objs[-1] 0955 return self.value == other.value 0956 0957 objs = [C(i) for i in range(4)] 0958 for o in objs: 0959 d[o] = o.value 0960 del o # now the only strong references to keys are in objs 0961 # Find the order in which iterkeys sees the keys. 0962 objs = d.keys() 0963 # Reverse it, so that the iteration implementation of __delitem__ 0964 # has to keep looping to find the first object we delete. 0965 objs.reverse() 0966 0967 # Turn on mutation in C.__eq__. The first time thru the loop, 0968 # under the iterkeys() business the first comparison will delete 0969 # the last item iterkeys() would see, and that causes a 0970 # RuntimeError: dictionary changed size during iteration 0971 # when the iterkeys() loop goes around to try comparing the next 0972 # key. After this was fixed, it just deletes the last object *our* 0973 # "for o in obj" loop would have gotten to. 0974 mutate = True 0975 count = 0 0976 for o in objs: 0977 count += 1 0978 del d[o] 0979 self.assertEqual(len(d), 0) 0980 self.assertEqual(count, 2) 0981 0982 from test import mapping_tests 0983 0984 class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): 0985 """Check that WeakValueDictionary conforms to the mapping protocol""" 0986 __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)} 0987 type2test = weakref.WeakValueDictionary 0988 def _reference(self): 0989 return self.__ref.copy() 0990 0991 class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): 0992 """Check that WeakKeyDictionary conforms to the mapping protocol""" 0993 __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3} 0994 type2test = weakref.WeakKeyDictionary 0995 def _reference(self): 0996 return self.__ref.copy() 0997 0998 def test_main(): 0999 test_support.run_unittest( 1000 ReferencesTestCase, 1001 MappingTestCase, 1002 WeakValueDictionaryTestCase, 1003 WeakKeyDictionaryTestCase, 1004 ) 1005 1006 1007 if __name__ == "__main__": 1008 test_main() 1009
Generated by PyXR 0.9.4