PyXR

c:\python24\lib \ test \ test_weakref.py



0001 import gc
0002 import sys
0003 import unittest
0004 import UserList
0005 import weakref
0006 
0007 from test import test_support
0008 
0009 
0010 class C:
0011     def method(self):
0012         pass
0013 
0014 
0015 class Callable:
0016     bar = None
0017 
0018     def __call__(self, x):
0019         self.bar = x
0020 
0021 
0022 def create_function():
0023     def f(): pass
0024     return f
0025 
0026 def create_bound_method():
0027     return C().method
0028 
0029 def create_unbound_method():
0030     return C.method
0031 
0032 
0033 class TestBase(unittest.TestCase):
0034 
0035     def setUp(self):
0036         self.cbcalled = 0
0037 
0038     def callback(self, ref):
0039         self.cbcalled += 1
0040 
0041 
0042 class ReferencesTestCase(TestBase):
0043 
0044     def test_basic_ref(self):
0045         self.check_basic_ref(C)
0046         self.check_basic_ref(create_function)
0047         self.check_basic_ref(create_bound_method)
0048         self.check_basic_ref(create_unbound_method)
0049 
0050         # Just make sure the tp_repr handler doesn't raise an exception.
0051         # Live reference:
0052         o = C()
0053         wr = weakref.ref(o)
0054         `wr`
0055         # Dead reference:
0056         del o
0057         `wr`
0058 
0059     def test_basic_callback(self):
0060         self.check_basic_callback(C)
0061         self.check_basic_callback(create_function)
0062         self.check_basic_callback(create_bound_method)
0063         self.check_basic_callback(create_unbound_method)
0064 
0065     def test_multiple_callbacks(self):
0066         o = C()
0067         ref1 = weakref.ref(o, self.callback)
0068         ref2 = weakref.ref(o, self.callback)
0069         del o
0070         self.assert_(ref1() is None,
0071                      "expected reference to be invalidated")
0072         self.assert_(ref2() is None,
0073                      "expected reference to be invalidated")
0074         self.assert_(self.cbcalled == 2,
0075                      "callback not called the right number of times")
0076 
0077     def test_multiple_selfref_callbacks(self):
0078         # Make sure all references are invalidated before callbacks are called
0079         #
0080         # What's important here is that we're using the first
0081         # reference in the callback invoked on the second reference
0082         # (the most recently created ref is cleaned up first).  This
0083         # tests that all references to the object are invalidated
0084         # before any of the callbacks are invoked, so that we only
0085         # have one invocation of _weakref.c:cleanup_helper() active
0086         # for a particular object at a time.
0087         #
0088         def callback(object, self=self):
0089             self.ref()
0090         c = C()
0091         self.ref = weakref.ref(c, callback)
0092         ref1 = weakref.ref(c, callback)
0093         del c
0094 
0095     def test_proxy_ref(self):
0096         o = C()
0097         o.bar = 1
0098         ref1 = weakref.proxy(o, self.callback)
0099         ref2 = weakref.proxy(o, self.callback)
0100         del o
0101 
0102         def check(proxy):
0103             proxy.bar
0104 
0105         self.assertRaises(weakref.ReferenceError, check, ref1)
0106         self.assertRaises(weakref.ReferenceError, check, ref2)
0107         self.assertRaises(weakref.ReferenceError, bool, weakref.proxy(C()))
0108         self.assert_(self.cbcalled == 2)
0109 
0110     def check_basic_ref(self, factory):
0111         o = factory()
0112         ref = weakref.ref(o)
0113         self.assert_(ref() is not None,
0114                      "weak reference to live object should be live")
0115         o2 = ref()
0116         self.assert_(o is o2,
0117                      "<ref>() should return original object if live")
0118 
0119     def check_basic_callback(self, factory):
0120         self.cbcalled = 0
0121         o = factory()
0122         ref = weakref.ref(o, self.callback)
0123         del o
0124         self.assert_(self.cbcalled == 1,
0125                      "callback did not properly set 'cbcalled'")
0126         self.assert_(ref() is None,
0127                      "ref2 should be dead after deleting object reference")
0128 
0129     def test_ref_reuse(self):
0130         o = C()
0131         ref1 = weakref.ref(o)
0132         # create a proxy to make sure that there's an intervening creation
0133         # between these two; it should make no difference
0134         proxy = weakref.proxy(o)
0135         ref2 = weakref.ref(o)
0136         self.assert_(ref1 is ref2,
0137                      "reference object w/out callback should be re-used")
0138 
0139         o = C()
0140         proxy = weakref.proxy(o)
0141         ref1 = weakref.ref(o)
0142         ref2 = weakref.ref(o)
0143         self.assert_(ref1 is ref2,
0144                      "reference object w/out callback should be re-used")
0145         self.assert_(weakref.getweakrefcount(o) == 2,
0146                      "wrong weak ref count for object")
0147         del proxy
0148         self.assert_(weakref.getweakrefcount(o) == 1,
0149                      "wrong weak ref count for object after deleting proxy")
0150 
0151     def test_proxy_reuse(self):
0152         o = C()
0153         proxy1 = weakref.proxy(o)
0154         ref = weakref.ref(o)
0155         proxy2 = weakref.proxy(o)
0156         self.assert_(proxy1 is proxy2,
0157                      "proxy object w/out callback should have been re-used")
0158 
0159     def test_basic_proxy(self):
0160         o = C()
0161         self.check_proxy(o, weakref.proxy(o))
0162 
0163         L = UserList.UserList()
0164         p = weakref.proxy(L)
0165         self.failIf(p, "proxy for empty UserList should be false")
0166         p.append(12)
0167         self.assertEqual(len(L), 1)
0168         self.failUnless(p, "proxy for non-empty UserList should be true")
0169         p[:] = [2, 3]
0170         self.assertEqual(len(L), 2)
0171         self.assertEqual(len(p), 2)
0172         self.failUnless(3 in p,
0173                         "proxy didn't support __contains__() properly")
0174         p[1] = 5
0175         self.assertEqual(L[1], 5)
0176         self.assertEqual(p[1], 5)
0177         L2 = UserList.UserList(L)
0178         p2 = weakref.proxy(L2)
0179         self.assertEqual(p, p2)
0180         ## self.assertEqual(repr(L2), repr(p2))
0181         L3 = UserList.UserList(range(10))
0182         p3 = weakref.proxy(L3)
0183         self.assertEqual(L3[:], p3[:])
0184         self.assertEqual(L3[5:], p3[5:])
0185         self.assertEqual(L3[:5], p3[:5])
0186         self.assertEqual(L3[2:5], p3[2:5])
0187 
0188     # The PyWeakref_* C API is documented as allowing either NULL or
0189     # None as the value for the callback, where either means "no
0190     # callback".  The "no callback" ref and proxy objects are supposed
0191     # to be shared so long as they exist by all callers so long as
0192     # they are active.  In Python 2.3.3 and earlier, this guaranttee
0193     # was not honored, and was broken in different ways for
0194     # PyWeakref_NewRef() and PyWeakref_NewProxy().  (Two tests.)
0195 
0196     def test_shared_ref_without_callback(self):
0197         self.check_shared_without_callback(weakref.ref)
0198 
0199     def test_shared_proxy_without_callback(self):
0200         self.check_shared_without_callback(weakref.proxy)
0201 
0202     def check_shared_without_callback(self, makeref):
0203         o = Object(1)
0204         p1 = makeref(o, None)
0205         p2 = makeref(o, None)
0206         self.assert_(p1 is p2, "both callbacks were None in the C API")
0207         del p1, p2
0208         p1 = makeref(o)
0209         p2 = makeref(o, None)
0210         self.assert_(p1 is p2, "callbacks were NULL, None in the C API")
0211         del p1, p2
0212         p1 = makeref(o)
0213         p2 = makeref(o)
0214         self.assert_(p1 is p2, "both callbacks were NULL in the C API")
0215         del p1, p2
0216         p1 = makeref(o, None)
0217         p2 = makeref(o)
0218         self.assert_(p1 is p2, "callbacks were None, NULL in the C API")
0219 
0220     def test_callable_proxy(self):
0221         o = Callable()
0222         ref1 = weakref.proxy(o)
0223 
0224         self.check_proxy(o, ref1)
0225 
0226         self.assert_(type(ref1) is weakref.CallableProxyType,
0227                      "proxy is not of callable type")
0228         ref1('twinkies!')
0229         self.assert_(o.bar == 'twinkies!',
0230                      "call through proxy not passed through to original")
0231         ref1(x='Splat.')
0232         self.assert_(o.bar == 'Splat.',
0233                      "call through proxy not passed through to original")
0234 
0235         # expect due to too few args
0236         self.assertRaises(TypeError, ref1)
0237 
0238         # expect due to too many args
0239         self.assertRaises(TypeError, ref1, 1, 2, 3)
0240 
0241     def check_proxy(self, o, proxy):
0242         o.foo = 1
0243         self.assert_(proxy.foo == 1,
0244                      "proxy does not reflect attribute addition")
0245         o.foo = 2
0246         self.assert_(proxy.foo == 2,
0247                      "proxy does not reflect attribute modification")
0248         del o.foo
0249         self.assert_(not hasattr(proxy, 'foo'),
0250                      "proxy does not reflect attribute removal")
0251 
0252         proxy.foo = 1
0253         self.assert_(o.foo == 1,
0254                      "object does not reflect attribute addition via proxy")
0255         proxy.foo = 2
0256         self.assert_(
0257             o.foo == 2,
0258             "object does not reflect attribute modification via proxy")
0259         del proxy.foo
0260         self.assert_(not hasattr(o, 'foo'),
0261                      "object does not reflect attribute removal via proxy")
0262 
0263     def test_proxy_deletion(self):
0264         # Test clearing of SF bug #762891
0265         class Foo:
0266             result = None
0267             def __delitem__(self, accessor):
0268                 self.result = accessor
0269         g = Foo()
0270         f = weakref.proxy(g)
0271         del f[0]
0272         self.assertEqual(f.result, 0)
0273 
0274     def test_getweakrefcount(self):
0275         o = C()
0276         ref1 = weakref.ref(o)
0277         ref2 = weakref.ref(o, self.callback)
0278         self.assert_(weakref.getweakrefcount(o) == 2,
0279                      "got wrong number of weak reference objects")
0280 
0281         proxy1 = weakref.proxy(o)
0282         proxy2 = weakref.proxy(o, self.callback)
0283         self.assert_(weakref.getweakrefcount(o) == 4,
0284                      "got wrong number of weak reference objects")
0285 
0286         del ref1, ref2, proxy1, proxy2
0287         self.assert_(weakref.getweakrefcount(o) == 0,
0288                      "weak reference objects not unlinked from"
0289                      " referent when discarded.")
0290 
0291         # assumes ints do not support weakrefs
0292         self.assert_(weakref.getweakrefcount(1) == 0,
0293                      "got wrong number of weak reference objects for int")
0294 
0295     def test_getweakrefs(self):
0296         o = C()
0297         ref1 = weakref.ref(o, self.callback)
0298         ref2 = weakref.ref(o, self.callback)
0299         del ref1
0300         self.assert_(weakref.getweakrefs(o) == [ref2],
0301                      "list of refs does not match")
0302 
0303         o = C()
0304         ref1 = weakref.ref(o, self.callback)
0305         ref2 = weakref.ref(o, self.callback)
0306         del ref2
0307         self.assert_(weakref.getweakrefs(o) == [ref1],
0308                      "list of refs does not match")
0309 
0310         del ref1
0311         self.assert_(weakref.getweakrefs(o) == [],
0312                      "list of refs not cleared")
0313 
0314         # assumes ints do not support weakrefs
0315         self.assert_(weakref.getweakrefs(1) == [],
0316                      "list of refs does not match for int")
0317 
0318     def test_newstyle_number_ops(self):
0319         class F(float):
0320             pass
0321         f = F(2.0)
0322         p = weakref.proxy(f)
0323         self.assert_(p + 1.0 == 3.0)
0324         self.assert_(1.0 + p == 3.0)  # this used to SEGV
0325 
0326     def test_callbacks_protected(self):
0327         # Callbacks protected from already-set exceptions?
0328         # Regression test for SF bug #478534.
0329         class BogusError(Exception):
0330             pass
0331         data = {}
0332         def remove(k):
0333             del data[k]
0334         def encapsulate():
0335             f = lambda : ()
0336             data[weakref.ref(f, remove)] = None
0337             raise BogusError
0338         try:
0339             encapsulate()
0340         except BogusError:
0341             pass
0342         else:
0343             self.fail("exception not properly restored")
0344         try:
0345             encapsulate()
0346         except BogusError:
0347             pass
0348         else:
0349             self.fail("exception not properly restored")
0350 
0351     def test_sf_bug_840829(self):
0352         # "weakref callbacks and gc corrupt memory"
0353         # subtype_dealloc erroneously exposed a new-style instance
0354         # already in the process of getting deallocated to gc,
0355         # causing double-deallocation if the instance had a weakref
0356         # callback that triggered gc.
0357         # If the bug exists, there probably won't be an obvious symptom
0358         # in a release build.  In a debug build, a segfault will occur
0359         # when the second attempt to remove the instance from the "list
0360         # of all objects" occurs.
0361 
0362         import gc
0363 
0364         class C(object):
0365             pass
0366 
0367         c = C()
0368         wr = weakref.ref(c, lambda ignore: gc.collect())
0369         del c
0370 
0371         # There endeth the first part.  It gets worse.
0372         del wr
0373 
0374         c1 = C()
0375         c1.i = C()
0376         wr = weakref.ref(c1.i, lambda ignore: gc.collect())
0377 
0378         c2 = C()
0379         c2.c1 = c1
0380         del c1  # still alive because c2 points to it
0381 
0382         # Now when subtype_dealloc gets called on c2, it's not enough just
0383         # that c2 is immune from gc while the weakref callbacks associated
0384         # with c2 execute (there are none in this 2nd half of the test, btw).
0385         # subtype_dealloc goes on to call the base classes' deallocs too,
0386         # so any gc triggered by weakref callbacks associated with anything
0387         # torn down by a base class dealloc can also trigger double
0388         # deallocation of c2.
0389         del c2
0390 
0391     def test_callback_in_cycle_1(self):
0392         import gc
0393 
0394         class J(object):
0395             pass
0396 
0397         class II(object):
0398             def acallback(self, ignore):
0399                 self.J
0400 
0401         I = II()
0402         I.J = J
0403         I.wr = weakref.ref(J, I.acallback)
0404 
0405         # Now J and II are each in a self-cycle (as all new-style class
0406         # objects are, since their __mro__ points back to them).  I holds
0407         # both a weak reference (I.wr) and a strong reference (I.J) to class
0408         # J.  I is also in a cycle (I.wr points to a weakref that references
0409         # I.acallback).  When we del these three, they all become trash, but
0410         # the cycles prevent any of them from getting cleaned up immediately.
0411         # Instead they have to wait for cyclic gc to deduce that they're
0412         # trash.
0413         #
0414         # gc used to call tp_clear on all of them, and the order in which
0415         # it does that is pretty accidental.  The exact order in which we
0416         # built up these things manages to provoke gc into running tp_clear
0417         # in just the right order (I last).  Calling tp_clear on II leaves
0418         # behind an insane class object (its __mro__ becomes NULL).  Calling
0419         # tp_clear on J breaks its self-cycle, but J doesn't get deleted
0420         # just then because of the strong reference from I.J.  Calling
0421         # tp_clear on I starts to clear I's __dict__, and just happens to
0422         # clear I.J first -- I.wr is still intact.  That removes the last
0423         # reference to J, which triggers the weakref callback.  The callback
0424         # tries to do "self.J", and instances of new-style classes look up
0425         # attributes ("J") in the class dict first.  The class (II) wants to
0426         # search II.__mro__, but that's NULL.   The result was a segfault in
0427         # a release build, and an assert failure in a debug build.
0428         del I, J, II
0429         gc.collect()
0430 
0431     def test_callback_in_cycle_2(self):
0432         import gc
0433 
0434         # This is just like test_callback_in_cycle_1, except that II is an
0435         # old-style class.  The symptom is different then:  an instance of an
0436         # old-style class looks in its own __dict__ first.  'J' happens to
0437         # get cleared from I.__dict__ before 'wr', and 'J' was never in II's
0438         # __dict__, so the attribute isn't found.  The difference is that
0439         # the old-style II doesn't have a NULL __mro__ (it doesn't have any
0440         # __mro__), so no segfault occurs.  Instead it got:
0441         #    test_callback_in_cycle_2 (__main__.ReferencesTestCase) ...
0442         #    Exception exceptions.AttributeError:
0443         #   "II instance has no attribute 'J'" in <bound method II.acallback
0444         #       of <?.II instance at 0x00B9B4B8>> ignored
0445 
0446         class J(object):
0447             pass
0448 
0449         class II:
0450             def acallback(self, ignore):
0451                 self.J
0452 
0453         I = II()
0454         I.J = J
0455         I.wr = weakref.ref(J, I.acallback)
0456 
0457         del I, J, II
0458         gc.collect()
0459 
0460     def test_callback_in_cycle_3(self):
0461         import gc
0462 
0463         # This one broke the first patch that fixed the last two.  In this
0464         # case, the objects reachable from the callback aren't also reachable
0465         # from the object (c1) *triggering* the callback:  you can get to
0466         # c1 from c2, but not vice-versa.  The result was that c2's __dict__
0467         # got tp_clear'ed by the time the c2.cb callback got invoked.
0468 
0469         class C:
0470             def cb(self, ignore):
0471                 self.me
0472                 self.c1
0473                 self.wr
0474 
0475         c1, c2 = C(), C()
0476 
0477         c2.me = c2
0478         c2.c1 = c1
0479         c2.wr = weakref.ref(c1, c2.cb)
0480 
0481         del c1, c2
0482         gc.collect()
0483 
0484     def test_callback_in_cycle_4(self):
0485         import gc
0486 
0487         # Like test_callback_in_cycle_3, except c2 and c1 have different
0488         # classes.  c2's class (C) isn't reachable from c1 then, so protecting
0489         # objects reachable from the dying object (c1) isn't enough to stop
0490         # c2's class (C) from getting tp_clear'ed before c2.cb is invoked.
0491         # The result was a segfault (C.__mro__ was NULL when the callback
0492         # tried to look up self.me).
0493 
0494         class C(object):
0495             def cb(self, ignore):
0496                 self.me
0497                 self.c1
0498                 self.wr
0499 
0500         class D:
0501             pass
0502 
0503         c1, c2 = D(), C()
0504 
0505         c2.me = c2
0506         c2.c1 = c1
0507         c2.wr = weakref.ref(c1, c2.cb)
0508 
0509         del c1, c2, C, D
0510         gc.collect()
0511 
0512     def test_callback_in_cycle_resurrection(self):
0513         import gc
0514 
0515         # Do something nasty in a weakref callback:  resurrect objects
0516         # from dead cycles.  For this to be attempted, the weakref and
0517         # its callback must also be part of the cyclic trash (else the
0518         # objects reachable via the callback couldn't be in cyclic trash
0519         # to begin with -- the callback would act like an external root).
0520         # But gc clears trash weakrefs with callbacks early now, which
0521         # disables the callbacks, so the callbacks shouldn't get called
0522         # at all (and so nothing actually gets resurrected).
0523 
0524         alist = []
0525         class C(object):
0526             def __init__(self, value):
0527                 self.attribute = value
0528 
0529             def acallback(self, ignore):
0530                 alist.append(self.c)
0531 
0532         c1, c2 = C(1), C(2)
0533         c1.c = c2
0534         c2.c = c1
0535         c1.wr = weakref.ref(c2, c1.acallback)
0536         c2.wr = weakref.ref(c1, c2.acallback)
0537 
0538         def C_went_away(ignore):
0539             alist.append("C went away")
0540         wr = weakref.ref(C, C_went_away)
0541 
0542         del c1, c2, C   # make them all trash
0543         self.assertEqual(alist, [])  # del isn't enough to reclaim anything
0544 
0545         gc.collect()
0546         # c1.wr and c2.wr were part of the cyclic trash, so should have
0547         # been cleared without their callbacks executing.  OTOH, the weakref
0548         # to C is bound to a function local (wr), and wasn't trash, so that
0549         # callback should have been invoked when C went away.
0550         self.assertEqual(alist, ["C went away"])
0551         # The remaining weakref should be dead now (its callback ran).
0552         self.assertEqual(wr(), None)
0553 
0554         del alist[:]
0555         gc.collect()
0556         self.assertEqual(alist, [])
0557 
0558     def test_callbacks_on_callback(self):
0559         import gc
0560 
0561         # Set up weakref callbacks *on* weakref callbacks.
0562         alist = []
0563         def safe_callback(ignore):
0564             alist.append("safe_callback called")
0565 
0566         class C(object):
0567             def cb(self, ignore):
0568                 alist.append("cb called")
0569 
0570         c, d = C(), C()
0571         c.other = d
0572         d.other = c
0573         callback = c.cb
0574         c.wr = weakref.ref(d, callback)     # this won't trigger
0575         d.wr = weakref.ref(callback, d.cb)  # ditto
0576         external_wr = weakref.ref(callback, safe_callback)  # but this will
0577         self.assert_(external_wr() is callback)
0578 
0579         # The weakrefs attached to c and d should get cleared, so that
0580         # C.cb is never called.  But external_wr isn't part of the cyclic
0581         # trash, and no cyclic trash is reachable from it, so safe_callback
0582         # should get invoked when the bound method object callback (c.cb)
0583         # -- which is itself a callback, and also part of the cyclic trash --
0584         # gets reclaimed at the end of gc.
0585 
0586         del callback, c, d, C
0587         self.assertEqual(alist, [])  # del isn't enough to clean up cycles
0588         gc.collect()
0589         self.assertEqual(alist, ["safe_callback called"])
0590         self.assertEqual(external_wr(), None)
0591 
0592         del alist[:]
0593         gc.collect()
0594         self.assertEqual(alist, [])
0595 
0596     def test_gc_during_ref_creation(self):
0597         self.check_gc_during_creation(weakref.ref)
0598 
0599     def test_gc_during_proxy_creation(self):
0600         self.check_gc_during_creation(weakref.proxy)
0601 
0602     def check_gc_during_creation(self, makeref):
0603         thresholds = gc.get_threshold()
0604         gc.set_threshold(1, 1, 1)
0605         gc.collect()
0606         class A:
0607             pass
0608 
0609         def callback(*args):
0610             pass
0611 
0612         referenced = A()
0613 
0614         a = A()
0615         a.a = a
0616         a.wr = makeref(referenced)
0617 
0618         try:
0619             # now make sure the object and the ref get labeled as
0620             # cyclic trash:
0621             a = A()
0622             weakref.ref(referenced, callback)
0623 
0624         finally:
0625             gc.set_threshold(*thresholds)
0626 
0627 
0628 class SubclassableWeakrefTestCase(unittest.TestCase):
0629 
0630     def test_subclass_refs(self):
0631         class MyRef(weakref.ref):
0632             def __init__(self, ob, callback=None, value=42):
0633                 self.value = value
0634                 super(MyRef, self).__init__(ob, callback)
0635             def __call__(self):
0636                 self.called = True
0637                 return super(MyRef, self).__call__()
0638         o = Object("foo")
0639         mr = MyRef(o, value=24)
0640         self.assert_(mr() is o)
0641         self.assert_(mr.called)
0642         self.assertEqual(mr.value, 24)
0643         del o
0644         self.assert_(mr() is None)
0645         self.assert_(mr.called)
0646 
0647     def test_subclass_refs_dont_replace_standard_refs(self):
0648         class MyRef(weakref.ref):
0649             pass
0650         o = Object(42)
0651         r1 = MyRef(o)
0652         r2 = weakref.ref(o)
0653         self.assert_(r1 is not r2)
0654         self.assertEqual(weakref.getweakrefs(o), [r2, r1])
0655         self.assertEqual(weakref.getweakrefcount(o), 2)
0656         r3 = MyRef(o)
0657         self.assertEqual(weakref.getweakrefcount(o), 3)
0658         refs = weakref.getweakrefs(o)
0659         self.assertEqual(len(refs), 3)
0660         self.assert_(r2 is refs[0])
0661         self.assert_(r1 in refs[1:])
0662         self.assert_(r3 in refs[1:])
0663 
0664     def test_subclass_refs_dont_conflate_callbacks(self):
0665         class MyRef(weakref.ref):
0666             pass
0667         o = Object(42)
0668         r1 = MyRef(o, id)
0669         r2 = MyRef(o, str)
0670         self.assert_(r1 is not r2)
0671         refs = weakref.getweakrefs(o)
0672         self.assert_(r1 in refs)
0673         self.assert_(r2 in refs)
0674 
0675     def test_subclass_refs_with_slots(self):
0676         class MyRef(weakref.ref):
0677             __slots__ = "slot1", "slot2"
0678             def __new__(type, ob, callback, slot1, slot2):
0679                 return weakref.ref.__new__(type, ob, callback)
0680             def __init__(self, ob, callback, slot1, slot2):
0681                 self.slot1 = slot1
0682                 self.slot2 = slot2
0683             def meth(self):
0684                 return self.slot1 + self.slot2
0685         o = Object(42)
0686         r = MyRef(o, None, "abc", "def")
0687         self.assertEqual(r.slot1, "abc")
0688         self.assertEqual(r.slot2, "def")
0689         self.assertEqual(r.meth(), "abcdef")
0690         self.failIf(hasattr(r, "__dict__"))
0691 
0692 
0693 class Object:
0694     def __init__(self, arg):
0695         self.arg = arg
0696     def __repr__(self):
0697         return "<Object %r>" % self.arg
0698 
0699 
0700 class MappingTestCase(TestBase):
0701 
0702     COUNT = 10
0703 
0704     def test_weak_values(self):
0705         #
0706         #  This exercises d.copy(), d.items(), d[], del d[], len(d).
0707         #
0708         dict, objects = self.make_weak_valued_dict()
0709         for o in objects:
0710             self.assert_(weakref.getweakrefcount(o) == 1,
0711                          "wrong number of weak references to %r!" % o)
0712             self.assert_(o is dict[o.arg],
0713                          "wrong object returned by weak dict!")
0714         items1 = dict.items()
0715         items2 = dict.copy().items()
0716         items1.sort()
0717         items2.sort()
0718         self.assert_(items1 == items2,
0719                      "cloning of weak-valued dictionary did not work!")
0720         del items1, items2
0721         self.assert_(len(dict) == self.COUNT)
0722         del objects[0]
0723         self.assert_(len(dict) == (self.COUNT - 1),
0724                      "deleting object did not cause dictionary update")
0725         del objects, o
0726         self.assert_(len(dict) == 0,
0727                      "deleting the values did not clear the dictionary")
0728         # regression on SF bug #447152:
0729         dict = weakref.WeakValueDictionary()
0730         self.assertRaises(KeyError, dict.__getitem__, 1)
0731         dict[2] = C()
0732         self.assertRaises(KeyError, dict.__getitem__, 2)
0733 
0734     def test_weak_keys(self):
0735         #
0736         #  This exercises d.copy(), d.items(), d[] = v, d[], del d[],
0737         #  len(d), d.has_key().
0738         #
0739         dict, objects = self.make_weak_keyed_dict()
0740         for o in objects:
0741             self.assert_(weakref.getweakrefcount(o) == 1,
0742                          "wrong number of weak references to %r!" % o)
0743             self.assert_(o.arg is dict[o],
0744                          "wrong object returned by weak dict!")
0745         items1 = dict.items()
0746         items2 = dict.copy().items()
0747         self.assert_(set(items1) == set(items2),
0748                      "cloning of weak-keyed dictionary did not work!")
0749         del items1, items2
0750         self.assert_(len(dict) == self.COUNT)
0751         del objects[0]
0752         self.assert_(len(dict) == (self.COUNT - 1),
0753                      "deleting object did not cause dictionary update")
0754         del objects, o
0755         self.assert_(len(dict) == 0,
0756                      "deleting the keys did not clear the dictionary")
0757         o = Object(42)
0758         dict[o] = "What is the meaning of the universe?"
0759         self.assert_(dict.has_key(o))
0760         self.assert_(not dict.has_key(34))
0761 
0762     def test_weak_keyed_iters(self):
0763         dict, objects = self.make_weak_keyed_dict()
0764         self.check_iters(dict)
0765 
0766     def test_weak_valued_iters(self):
0767         dict, objects = self.make_weak_valued_dict()
0768         self.check_iters(dict)
0769 
0770     def check_iters(self, dict):
0771         # item iterator:
0772         items = dict.items()
0773         for item in dict.iteritems():
0774             items.remove(item)
0775         self.assert_(len(items) == 0, "iteritems() did not touch all items")
0776 
0777         # key iterator, via __iter__():
0778         keys = dict.keys()
0779         for k in dict:
0780             keys.remove(k)
0781         self.assert_(len(keys) == 0, "__iter__() did not touch all keys")
0782 
0783         # key iterator, via iterkeys():
0784         keys = dict.keys()
0785         for k in dict.iterkeys():
0786             keys.remove(k)
0787         self.assert_(len(keys) == 0, "iterkeys() did not touch all keys")
0788 
0789         # value iterator:
0790         values = dict.values()
0791         for v in dict.itervalues():
0792             values.remove(v)
0793         self.assert_(len(values) == 0,
0794                      "itervalues() did not touch all values")
0795 
0796     def test_make_weak_keyed_dict_from_dict(self):
0797         o = Object(3)
0798         dict = weakref.WeakKeyDictionary({o:364})
0799         self.assert_(dict[o] == 364)
0800 
0801     def test_make_weak_keyed_dict_from_weak_keyed_dict(self):
0802         o = Object(3)
0803         dict = weakref.WeakKeyDictionary({o:364})
0804         dict2 = weakref.WeakKeyDictionary(dict)
0805         self.assert_(dict[o] == 364)
0806 
0807     def make_weak_keyed_dict(self):
0808         dict = weakref.WeakKeyDictionary()
0809         objects = map(Object, range(self.COUNT))
0810         for o in objects:
0811             dict[o] = o.arg
0812         return dict, objects
0813 
0814     def make_weak_valued_dict(self):
0815         dict = weakref.WeakValueDictionary()
0816         objects = map(Object, range(self.COUNT))
0817         for o in objects:
0818             dict[o.arg] = o
0819         return dict, objects
0820 
0821     def check_popitem(self, klass, key1, value1, key2, value2):
0822         weakdict = klass()
0823         weakdict[key1] = value1
0824         weakdict[key2] = value2
0825         self.assert_(len(weakdict) == 2)
0826         k, v = weakdict.popitem()
0827         self.assert_(len(weakdict) == 1)
0828         if k is key1:
0829             self.assert_(v is value1)
0830         else:
0831             self.assert_(v is value2)
0832         k, v = weakdict.popitem()
0833         self.assert_(len(weakdict) == 0)
0834         if k is key1:
0835             self.assert_(v is value1)
0836         else:
0837             self.assert_(v is value2)
0838 
0839     def test_weak_valued_dict_popitem(self):
0840         self.check_popitem(weakref.WeakValueDictionary,
0841                            "key1", C(), "key2", C())
0842 
0843     def test_weak_keyed_dict_popitem(self):
0844         self.check_popitem(weakref.WeakKeyDictionary,
0845                            C(), "value 1", C(), "value 2")
0846 
0847     def check_setdefault(self, klass, key, value1, value2):
0848         self.assert_(value1 is not value2,
0849                      "invalid test"
0850                      " -- value parameters must be distinct objects")
0851         weakdict = klass()
0852         o = weakdict.setdefault(key, value1)
0853         self.assert_(o is value1)
0854         self.assert_(weakdict.has_key(key))
0855         self.assert_(weakdict.get(key) is value1)
0856         self.assert_(weakdict[key] is value1)
0857 
0858         o = weakdict.setdefault(key, value2)
0859         self.assert_(o is value1)
0860         self.assert_(weakdict.has_key(key))
0861         self.assert_(weakdict.get(key) is value1)
0862         self.assert_(weakdict[key] is value1)
0863 
0864     def test_weak_valued_dict_setdefault(self):
0865         self.check_setdefault(weakref.WeakValueDictionary,
0866                               "key", C(), C())
0867 
0868     def test_weak_keyed_dict_setdefault(self):
0869         self.check_setdefault(weakref.WeakKeyDictionary,
0870                               C(), "value 1", "value 2")
0871 
0872     def check_update(self, klass, dict):
0873         #
0874         #  This exercises d.update(), len(d), d.keys(), d.has_key(),
0875         #  d.get(), d[].
0876         #
0877         weakdict = klass()
0878         weakdict.update(dict)
0879         self.assert_(len(weakdict) == len(dict))
0880         for k in weakdict.keys():
0881             self.assert_(dict.has_key(k),
0882                          "mysterious new key appeared in weak dict")
0883             v = dict.get(k)
0884             self.assert_(v is weakdict[k])
0885             self.assert_(v is weakdict.get(k))
0886         for k in dict.keys():
0887             self.assert_(weakdict.has_key(k),
0888                          "original key disappeared in weak dict")
0889             v = dict[k]
0890             self.assert_(v is weakdict[k])
0891             self.assert_(v is weakdict.get(k))
0892 
0893     def test_weak_valued_dict_update(self):
0894         self.check_update(weakref.WeakValueDictionary,
0895                           {1: C(), 'a': C(), C(): C()})
0896 
0897     def test_weak_keyed_dict_update(self):
0898         self.check_update(weakref.WeakKeyDictionary,
0899                           {C(): 1, C(): 2, C(): 3})
0900 
0901     def test_weak_keyed_delitem(self):
0902         d = weakref.WeakKeyDictionary()
0903         o1 = Object('1')
0904         o2 = Object('2')
0905         d[o1] = 'something'
0906         d[o2] = 'something'
0907         self.assert_(len(d) == 2)
0908         del d[o1]
0909         self.assert_(len(d) == 1)
0910         self.assert_(d.keys() == [o2])
0911 
0912     def test_weak_valued_delitem(self):
0913         d = weakref.WeakValueDictionary()
0914         o1 = Object('1')
0915         o2 = Object('2')
0916         d['something'] = o1
0917         d['something else'] = o2
0918         self.assert_(len(d) == 2)
0919         del d['something']
0920         self.assert_(len(d) == 1)
0921         self.assert_(d.items() == [('something else', o2)])
0922 
0923     def test_weak_keyed_bad_delitem(self):
0924         d = weakref.WeakKeyDictionary()
0925         o = Object('1')
0926         # An attempt to delete an object that isn't there should raise
0927         # KeyError.  It didn't before 2.3.
0928         self.assertRaises(KeyError, d.__delitem__, o)
0929         self.assertRaises(KeyError, d.__getitem__, o)
0930 
0931         # If a key isn't of a weakly referencable type, __getitem__ and
0932         # __setitem__ raise TypeError.  __delitem__ should too.
0933         self.assertRaises(TypeError, d.__delitem__,  13)
0934         self.assertRaises(TypeError, d.__getitem__,  13)
0935         self.assertRaises(TypeError, d.__setitem__,  13, 13)
0936 
0937     def test_weak_keyed_cascading_deletes(self):
0938         # SF bug 742860.  For some reason, before 2.3 __delitem__ iterated
0939         # over the keys via self.data.iterkeys().  If things vanished from
0940         # the dict during this (or got added), that caused a RuntimeError.
0941 
0942         d = weakref.WeakKeyDictionary()
0943         mutate = False
0944 
0945         class C(object):
0946             def __init__(self, i):
0947                 self.value = i
0948             def __hash__(self):
0949                 return hash(self.value)
0950             def __eq__(self, other):
0951                 if mutate:
0952                     # Side effect that mutates the dict, by removing the
0953                     # last strong reference to a key.
0954                     del objs[-1]
0955                 return self.value == other.value
0956 
0957         objs = [C(i) for i in range(4)]
0958         for o in objs:
0959             d[o] = o.value
0960         del o   # now the only strong references to keys are in objs
0961         # Find the order in which iterkeys sees the keys.
0962         objs = d.keys()
0963         # Reverse it, so that the iteration implementation of __delitem__
0964         # has to keep looping to find the first object we delete.
0965         objs.reverse()
0966 
0967         # Turn on mutation in C.__eq__.  The first time thru the loop,
0968         # under the iterkeys() business the first comparison will delete
0969         # the last item iterkeys() would see, and that causes a
0970         #     RuntimeError: dictionary changed size during iteration
0971         # when the iterkeys() loop goes around to try comparing the next
0972         # key.  After this was fixed, it just deletes the last object *our*
0973         # "for o in obj" loop would have gotten to.
0974         mutate = True
0975         count = 0
0976         for o in objs:
0977             count += 1
0978             del d[o]
0979         self.assertEqual(len(d), 0)
0980         self.assertEqual(count, 2)
0981 
0982 from test import mapping_tests
0983 
0984 class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
0985     """Check that WeakValueDictionary conforms to the mapping protocol"""
0986     __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)}
0987     type2test = weakref.WeakValueDictionary
0988     def _reference(self):
0989         return self.__ref.copy()
0990 
0991 class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
0992     """Check that WeakKeyDictionary conforms to the mapping protocol"""
0993     __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3}
0994     type2test = weakref.WeakKeyDictionary
0995     def _reference(self):
0996         return self.__ref.copy()
0997 
0998 def test_main():
0999     test_support.run_unittest(
1000         ReferencesTestCase,
1001         MappingTestCase,
1002         WeakValueDictionaryTestCase,
1003         WeakKeyDictionaryTestCase,
1004         )
1005 
1006 
1007 if __name__ == "__main__":
1008     test_main()
1009 

Generated by PyXR 0.9.4
SourceForge.net Logo