0001 import unittest 0002 from test import test_support 0003 from itertools import * 0004 from weakref import proxy 0005 import sys 0006 import operator 0007 import random 0008 0009 def onearg(x): 0010 'Test function of one argument' 0011 return 2*x 0012 0013 def errfunc(*args): 0014 'Test function that raises an error' 0015 raise ValueError 0016 0017 def gen3(): 0018 'Non-restartable source sequence' 0019 for i in (0, 1, 2): 0020 yield i 0021 0022 def isEven(x): 0023 'Test predicate' 0024 return x%2==0 0025 0026 def isOdd(x): 0027 'Test predicate' 0028 return x%2==1 0029 0030 class StopNow: 0031 'Class emulating an empty iterable.' 0032 def __iter__(self): 0033 return self 0034 def next(self): 0035 raise StopIteration 0036 0037 def take(n, seq): 0038 'Convenience function for partially consuming a long of infinite iterable' 0039 return list(islice(seq, n)) 0040 0041 class TestBasicOps(unittest.TestCase): 0042 def test_chain(self): 0043 self.assertEqual(list(chain('abc', 'def')), list('abcdef')) 0044 self.assertEqual(list(chain('abc')), list('abc')) 0045 self.assertEqual(list(chain('')), []) 0046 self.assertEqual(take(4, chain('abc', 'def')), list('abcd')) 0047 self.assertRaises(TypeError, chain, 2, 3) 0048 0049 def test_count(self): 0050 self.assertEqual(zip('abc',count()), [('a', 0), ('b', 1), ('c', 2)]) 0051 self.assertEqual(zip('abc',count(3)), [('a', 3), ('b', 4), ('c', 5)]) 0052 self.assertEqual(take(2, zip('abc',count(3))), [('a', 3), ('b', 4)]) 0053 self.assertRaises(TypeError, count, 2, 3) 0054 self.assertRaises(TypeError, count, 'a') 0055 c = count(sys.maxint-2) # verify that rollover doesn't crash 0056 c.next(); c.next(); c.next(); c.next(); c.next() 0057 c = count(3) 0058 self.assertEqual(repr(c), 'count(3)') 0059 c.next() 0060 self.assertEqual(repr(c), 'count(4)') 0061 0062 def test_cycle(self): 0063 self.assertEqual(take(10, cycle('abc')), list('abcabcabca')) 0064 self.assertEqual(list(cycle('')), []) 0065 self.assertRaises(TypeError, cycle) 0066 self.assertRaises(TypeError, cycle, 5) 0067 self.assertEqual(list(islice(cycle(gen3()),10)), [0,1,2,0,1,2,0,1,2,0]) 0068 0069 def test_groupby(self): 0070 # Check whether it accepts arguments correctly 0071 self.assertEqual([], list(groupby([]))) 0072 self.assertEqual([], list(groupby([], key=id))) 0073 self.assertRaises(TypeError, list, groupby('abc', [])) 0074 self.assertRaises(TypeError, groupby, None) 0075 self.assertRaises(TypeError, groupby, 'abc', lambda x:x, 10) 0076 0077 # Check normal input 0078 s = [(0, 10, 20), (0, 11,21), (0,12,21), (1,13,21), (1,14,22), 0079 (2,15,22), (3,16,23), (3,17,23)] 0080 dup = [] 0081 for k, g in groupby(s, lambda r:r[0]): 0082 for elem in g: 0083 self.assertEqual(k, elem[0]) 0084 dup.append(elem) 0085 self.assertEqual(s, dup) 0086 0087 # Check nested case 0088 dup = [] 0089 for k, g in groupby(s, lambda r:r[0]): 0090 for ik, ig in groupby(g, lambda r:r[2]): 0091 for elem in ig: 0092 self.assertEqual(k, elem[0]) 0093 self.assertEqual(ik, elem[2]) 0094 dup.append(elem) 0095 self.assertEqual(s, dup) 0096 0097 # Check case where inner iterator is not used 0098 keys = [k for k, g in groupby(s, lambda r:r[0])] 0099 expectedkeys = set([r[0] for r in s]) 0100 self.assertEqual(set(keys), expectedkeys) 0101 self.assertEqual(len(keys), len(expectedkeys)) 0102 0103 # Exercise pipes and filters style 0104 s = 'abracadabra' 0105 # sort s | uniq 0106 r = [k for k, g in groupby(sorted(s))] 0107 self.assertEqual(r, ['a', 'b', 'c', 'd', 'r']) 0108 # sort s | uniq -d 0109 r = [k for k, g in groupby(sorted(s)) if list(islice(g,1,2))] 0110 self.assertEqual(r, ['a', 'b', 'r']) 0111 # sort s | uniq -c 0112 r = [(len(list(g)), k) for k, g in groupby(sorted(s))] 0113 self.assertEqual(r, [(5, 'a'), (2, 'b'), (1, 'c'), (1, 'd'), (2, 'r')]) 0114 # sort s | uniq -c | sort -rn | head -3 0115 r = sorted([(len(list(g)) , k) for k, g in groupby(sorted(s))], reverse=True)[:3] 0116 self.assertEqual(r, [(5, 'a'), (2, 'r'), (2, 'b')]) 0117 0118 # iter.next failure 0119 class ExpectedError(Exception): 0120 pass 0121 def delayed_raise(n=0): 0122 for i in range(n): 0123 yield 'yo' 0124 raise ExpectedError 0125 def gulp(iterable, keyp=None, func=list): 0126 return [func(g) for k, g in groupby(iterable, keyp)] 0127 0128 # iter.next failure on outer object 0129 self.assertRaises(ExpectedError, gulp, delayed_raise(0)) 0130 # iter.next failure on inner object 0131 self.assertRaises(ExpectedError, gulp, delayed_raise(1)) 0132 0133 # __cmp__ failure 0134 class DummyCmp: 0135 def __cmp__(self, dst): 0136 raise ExpectedError 0137 s = [DummyCmp(), DummyCmp(), None] 0138 0139 # __cmp__ failure on outer object 0140 self.assertRaises(ExpectedError, gulp, s, func=id) 0141 # __cmp__ failure on inner object 0142 self.assertRaises(ExpectedError, gulp, s) 0143 0144 # keyfunc failure 0145 def keyfunc(obj): 0146 if keyfunc.skip > 0: 0147 keyfunc.skip -= 1 0148 return obj 0149 else: 0150 raise ExpectedError 0151 0152 # keyfunc failure on outer object 0153 keyfunc.skip = 0 0154 self.assertRaises(ExpectedError, gulp, [None], keyfunc) 0155 keyfunc.skip = 1 0156 self.assertRaises(ExpectedError, gulp, [None, None], keyfunc) 0157 0158 def test_ifilter(self): 0159 self.assertEqual(list(ifilter(isEven, range(6))), [0,2,4]) 0160 self.assertEqual(list(ifilter(None, [0,1,0,2,0])), [1,2]) 0161 self.assertEqual(take(4, ifilter(isEven, count())), [0,2,4,6]) 0162 self.assertRaises(TypeError, ifilter) 0163 self.assertRaises(TypeError, ifilter, lambda x:x) 0164 self.assertRaises(TypeError, ifilter, lambda x:x, range(6), 7) 0165 self.assertRaises(TypeError, ifilter, isEven, 3) 0166 self.assertRaises(TypeError, ifilter(range(6), range(6)).next) 0167 0168 def test_ifilterfalse(self): 0169 self.assertEqual(list(ifilterfalse(isEven, range(6))), [1,3,5]) 0170 self.assertEqual(list(ifilterfalse(None, [0,1,0,2,0])), [0,0,0]) 0171 self.assertEqual(take(4, ifilterfalse(isEven, count())), [1,3,5,7]) 0172 self.assertRaises(TypeError, ifilterfalse) 0173 self.assertRaises(TypeError, ifilterfalse, lambda x:x) 0174 self.assertRaises(TypeError, ifilterfalse, lambda x:x, range(6), 7) 0175 self.assertRaises(TypeError, ifilterfalse, isEven, 3) 0176 self.assertRaises(TypeError, ifilterfalse(range(6), range(6)).next) 0177 0178 def test_izip(self): 0179 ans = [(x,y) for x, y in izip('abc',count())] 0180 self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)]) 0181 self.assertEqual(list(izip('abc', range(6))), zip('abc', range(6))) 0182 self.assertEqual(list(izip('abcdef', range(3))), zip('abcdef', range(3))) 0183 self.assertEqual(take(3,izip('abcdef', count())), zip('abcdef', range(3))) 0184 self.assertEqual(list(izip('abcdef')), zip('abcdef')) 0185 self.assertEqual(list(izip()), zip()) 0186 self.assertRaises(TypeError, izip, 3) 0187 self.assertRaises(TypeError, izip, range(3), 3) 0188 # Check tuple re-use (implementation detail) 0189 self.assertEqual([tuple(list(pair)) for pair in izip('abc', 'def')], 0190 zip('abc', 'def')) 0191 self.assertEqual([pair for pair in izip('abc', 'def')], 0192 zip('abc', 'def')) 0193 ids = map(id, izip('abc', 'def')) 0194 self.assertEqual(min(ids), max(ids)) 0195 ids = map(id, list(izip('abc', 'def'))) 0196 self.assertEqual(len(dict.fromkeys(ids)), len(ids)) 0197 0198 def test_repeat(self): 0199 self.assertEqual(zip(xrange(3),repeat('a')), 0200 [(0, 'a'), (1, 'a'), (2, 'a')]) 0201 self.assertEqual(list(repeat('a', 3)), ['a', 'a', 'a']) 0202 self.assertEqual(take(3, repeat('a')), ['a', 'a', 'a']) 0203 self.assertEqual(list(repeat('a', 0)), []) 0204 self.assertEqual(list(repeat('a', -3)), []) 0205 self.assertRaises(TypeError, repeat) 0206 self.assertRaises(TypeError, repeat, None, 3, 4) 0207 self.assertRaises(TypeError, repeat, None, 'a') 0208 r = repeat(1+0j) 0209 self.assertEqual(repr(r), 'repeat((1+0j))') 0210 r = repeat(1+0j, 5) 0211 self.assertEqual(repr(r), 'repeat((1+0j), 5)') 0212 list(r) 0213 self.assertEqual(repr(r), 'repeat((1+0j), 0)') 0214 0215 def test_imap(self): 0216 self.assertEqual(list(imap(operator.pow, range(3), range(1,7))), 0217 [0**1, 1**2, 2**3]) 0218 self.assertEqual(list(imap(None, 'abc', range(5))), 0219 [('a',0),('b',1),('c',2)]) 0220 self.assertEqual(list(imap(None, 'abc', count())), 0221 [('a',0),('b',1),('c',2)]) 0222 self.assertEqual(take(2,imap(None, 'abc', count())), 0223 [('a',0),('b',1)]) 0224 self.assertEqual(list(imap(operator.pow, [])), []) 0225 self.assertRaises(TypeError, imap) 0226 self.assertRaises(TypeError, imap, operator.neg) 0227 self.assertRaises(TypeError, imap(10, range(5)).next) 0228 self.assertRaises(ValueError, imap(errfunc, [4], [5]).next) 0229 self.assertRaises(TypeError, imap(onearg, [4], [5]).next) 0230 0231 def test_starmap(self): 0232 self.assertEqual(list(starmap(operator.pow, zip(range(3), range(1,7)))), 0233 [0**1, 1**2, 2**3]) 0234 self.assertEqual(take(3, starmap(operator.pow, izip(count(), count(1)))), 0235 [0**1, 1**2, 2**3]) 0236 self.assertEqual(list(starmap(operator.pow, [])), []) 0237 self.assertRaises(TypeError, list, starmap(operator.pow, [[4,5]])) 0238 self.assertRaises(TypeError, starmap) 0239 self.assertRaises(TypeError, starmap, operator.pow, [(4,5)], 'extra') 0240 self.assertRaises(TypeError, starmap(10, [(4,5)]).next) 0241 self.assertRaises(ValueError, starmap(errfunc, [(4,5)]).next) 0242 self.assertRaises(TypeError, starmap(onearg, [(4,5)]).next) 0243 0244 def test_islice(self): 0245 for args in [ # islice(args) should agree with range(args) 0246 (10, 20, 3), 0247 (10, 3, 20), 0248 (10, 20), 0249 (10, 3), 0250 (20,) 0251 ]: 0252 self.assertEqual(list(islice(xrange(100), *args)), range(*args)) 0253 0254 for args, tgtargs in [ # Stop when seqn is exhausted 0255 ((10, 110, 3), ((10, 100, 3))), 0256 ((10, 110), ((10, 100))), 0257 ((110,), (100,)) 0258 ]: 0259 self.assertEqual(list(islice(xrange(100), *args)), range(*tgtargs)) 0260 0261 # Test stop=None 0262 self.assertEqual(list(islice(xrange(10), None)), range(10)) 0263 self.assertEqual(list(islice(xrange(10), 2, None)), range(2, 10)) 0264 self.assertEqual(list(islice(xrange(10), 1, None, 2)), range(1, 10, 2)) 0265 0266 # Test invalid arguments 0267 self.assertRaises(TypeError, islice, xrange(10)) 0268 self.assertRaises(TypeError, islice, xrange(10), 1, 2, 3, 4) 0269 self.assertRaises(ValueError, islice, xrange(10), -5, 10, 1) 0270 self.assertRaises(ValueError, islice, xrange(10), 1, -5, -1) 0271 self.assertRaises(ValueError, islice, xrange(10), 1, 10, -1) 0272 self.assertRaises(ValueError, islice, xrange(10), 1, 10, 0) 0273 self.assertRaises(ValueError, islice, xrange(10), 'a') 0274 self.assertRaises(ValueError, islice, xrange(10), 'a', 1) 0275 self.assertRaises(ValueError, islice, xrange(10), 1, 'a') 0276 self.assertRaises(ValueError, islice, xrange(10), 'a', 1, 1) 0277 self.assertRaises(ValueError, islice, xrange(10), 1, 'a', 1) 0278 self.assertEqual(len(list(islice(count(), 1, 10, sys.maxint))), 1) 0279 0280 def test_takewhile(self): 0281 data = [1, 3, 5, 20, 2, 4, 6, 8] 0282 underten = lambda x: x<10 0283 self.assertEqual(list(takewhile(underten, data)), [1, 3, 5]) 0284 self.assertEqual(list(takewhile(underten, [])), []) 0285 self.assertRaises(TypeError, takewhile) 0286 self.assertRaises(TypeError, takewhile, operator.pow) 0287 self.assertRaises(TypeError, takewhile, operator.pow, [(4,5)], 'extra') 0288 self.assertRaises(TypeError, takewhile(10, [(4,5)]).next) 0289 self.assertRaises(ValueError, takewhile(errfunc, [(4,5)]).next) 0290 t = takewhile(bool, [1, 1, 1, 0, 0, 0]) 0291 self.assertEqual(list(t), [1, 1, 1]) 0292 self.assertRaises(StopIteration, t.next) 0293 0294 def test_dropwhile(self): 0295 data = [1, 3, 5, 20, 2, 4, 6, 8] 0296 underten = lambda x: x<10 0297 self.assertEqual(list(dropwhile(underten, data)), [20, 2, 4, 6, 8]) 0298 self.assertEqual(list(dropwhile(underten, [])), []) 0299 self.assertRaises(TypeError, dropwhile) 0300 self.assertRaises(TypeError, dropwhile, operator.pow) 0301 self.assertRaises(TypeError, dropwhile, operator.pow, [(4,5)], 'extra') 0302 self.assertRaises(TypeError, dropwhile(10, [(4,5)]).next) 0303 self.assertRaises(ValueError, dropwhile(errfunc, [(4,5)]).next) 0304 0305 def test_tee(self): 0306 n = 200 0307 def irange(n): 0308 for i in xrange(n): 0309 yield i 0310 0311 a, b = tee([]) # test empty iterator 0312 self.assertEqual(list(a), []) 0313 self.assertEqual(list(b), []) 0314 0315 a, b = tee(irange(n)) # test 100% interleaved 0316 self.assertEqual(zip(a,b), zip(range(n),range(n))) 0317 0318 a, b = tee(irange(n)) # test 0% interleaved 0319 self.assertEqual(list(a), range(n)) 0320 self.assertEqual(list(b), range(n)) 0321 0322 a, b = tee(irange(n)) # test dealloc of leading iterator 0323 for i in xrange(100): 0324 self.assertEqual(a.next(), i) 0325 del a 0326 self.assertEqual(list(b), range(n)) 0327 0328 a, b = tee(irange(n)) # test dealloc of trailing iterator 0329 for i in xrange(100): 0330 self.assertEqual(a.next(), i) 0331 del b 0332 self.assertEqual(list(a), range(100, n)) 0333 0334 for j in xrange(5): # test randomly interleaved 0335 order = [0]*n + [1]*n 0336 random.shuffle(order) 0337 lists = ([], []) 0338 its = tee(irange(n)) 0339 for i in order: 0340 value = its[i].next() 0341 lists[i].append(value) 0342 self.assertEqual(lists[0], range(n)) 0343 self.assertEqual(lists[1], range(n)) 0344 0345 # test argument format checking 0346 self.assertRaises(TypeError, tee) 0347 self.assertRaises(TypeError, tee, 3) 0348 self.assertRaises(TypeError, tee, [1,2], 'x') 0349 self.assertRaises(TypeError, tee, [1,2], 3, 'x') 0350 0351 # tee object should be instantiable 0352 a, b = tee('abc') 0353 c = type(a)('def') 0354 self.assertEqual(list(c), list('def')) 0355 0356 # test long-lagged and multi-way split 0357 a, b, c = tee(xrange(2000), 3) 0358 for i in xrange(100): 0359 self.assertEqual(a.next(), i) 0360 self.assertEqual(list(b), range(2000)) 0361 self.assertEqual([c.next(), c.next()], range(2)) 0362 self.assertEqual(list(a), range(100,2000)) 0363 self.assertEqual(list(c), range(2,2000)) 0364 0365 # test values of n 0366 self.assertRaises(TypeError, tee, 'abc', 'invalid') 0367 for n in xrange(5): 0368 result = tee('abc', n) 0369 self.assertEqual(type(result), tuple) 0370 self.assertEqual(len(result), n) 0371 self.assertEqual(map(list, result), [list('abc')]*n) 0372 0373 # tee pass-through to copyable iterator 0374 a, b = tee('abc') 0375 c, d = tee(a) 0376 self.assert_(a is c) 0377 0378 # test tee_new 0379 t1, t2 = tee('abc') 0380 tnew = type(t1) 0381 self.assertRaises(TypeError, tnew) 0382 self.assertRaises(TypeError, tnew, 10) 0383 t3 = tnew(t1) 0384 self.assert_(list(t1) == list(t2) == list(t3) == list('abc')) 0385 0386 # test that tee objects are weak referencable 0387 a, b = tee(xrange(10)) 0388 p = proxy(a) 0389 self.assertEqual(getattr(p, '__class__'), type(b)) 0390 del a 0391 self.assertRaises(ReferenceError, getattr, p, '__class__') 0392 0393 def test_StopIteration(self): 0394 self.assertRaises(StopIteration, izip().next) 0395 0396 for f in (chain, cycle, izip, groupby): 0397 self.assertRaises(StopIteration, f([]).next) 0398 self.assertRaises(StopIteration, f(StopNow()).next) 0399 0400 self.assertRaises(StopIteration, islice([], None).next) 0401 self.assertRaises(StopIteration, islice(StopNow(), None).next) 0402 0403 p, q = tee([]) 0404 self.assertRaises(StopIteration, p.next) 0405 self.assertRaises(StopIteration, q.next) 0406 p, q = tee(StopNow()) 0407 self.assertRaises(StopIteration, p.next) 0408 self.assertRaises(StopIteration, q.next) 0409 0410 self.assertRaises(StopIteration, repeat(None, 0).next) 0411 0412 for f in (ifilter, ifilterfalse, imap, takewhile, dropwhile, starmap): 0413 self.assertRaises(StopIteration, f(lambda x:x, []).next) 0414 self.assertRaises(StopIteration, f(lambda x:x, StopNow()).next) 0415 0416 class TestGC(unittest.TestCase): 0417 0418 def makecycle(self, iterator, container): 0419 container.append(iterator) 0420 iterator.next() 0421 del container, iterator 0422 0423 def test_chain(self): 0424 a = [] 0425 self.makecycle(chain(a), a) 0426 0427 def test_cycle(self): 0428 a = [] 0429 self.makecycle(cycle([a]*2), a) 0430 0431 def test_dropwhile(self): 0432 a = [] 0433 self.makecycle(dropwhile(bool, [0, a, a]), a) 0434 0435 def test_groupby(self): 0436 a = [] 0437 self.makecycle(groupby([a]*2, lambda x:x), a) 0438 0439 def test_ifilter(self): 0440 a = [] 0441 self.makecycle(ifilter(lambda x:True, [a]*2), a) 0442 0443 def test_ifilterfalse(self): 0444 a = [] 0445 self.makecycle(ifilterfalse(lambda x:False, a), a) 0446 0447 def test_izip(self): 0448 a = [] 0449 self.makecycle(izip([a]*2, [a]*3), a) 0450 0451 def test_imap(self): 0452 a = [] 0453 self.makecycle(imap(lambda x:x, [a]*2), a) 0454 0455 def test_islice(self): 0456 a = [] 0457 self.makecycle(islice([a]*2, None), a) 0458 0459 def test_repeat(self): 0460 a = [] 0461 self.makecycle(repeat(a), a) 0462 0463 def test_starmap(self): 0464 a = [] 0465 self.makecycle(starmap(lambda *t: t, [(a,a)]*2), a) 0466 0467 def test_takewhile(self): 0468 a = [] 0469 self.makecycle(takewhile(bool, [1, 0, a, a]), a) 0470 0471 def R(seqn): 0472 'Regular generator' 0473 for i in seqn: 0474 yield i 0475 0476 class G: 0477 'Sequence using __getitem__' 0478 def __init__(self, seqn): 0479 self.seqn = seqn 0480 def __getitem__(self, i): 0481 return self.seqn[i] 0482 0483 class I: 0484 'Sequence using iterator protocol' 0485 def __init__(self, seqn): 0486 self.seqn = seqn 0487 self.i = 0 0488 def __iter__(self): 0489 return self 0490 def next(self): 0491 if self.i >= len(self.seqn): raise StopIteration 0492 v = self.seqn[self.i] 0493 self.i += 1 0494 return v 0495 0496 class Ig: 0497 'Sequence using iterator protocol defined with a generator' 0498 def __init__(self, seqn): 0499 self.seqn = seqn 0500 self.i = 0 0501 def __iter__(self): 0502 for val in self.seqn: 0503 yield val 0504 0505 class X: 0506 'Missing __getitem__ and __iter__' 0507 def __init__(self, seqn): 0508 self.seqn = seqn 0509 self.i = 0 0510 def next(self): 0511 if self.i >= len(self.seqn): raise StopIteration 0512 v = self.seqn[self.i] 0513 self.i += 1 0514 return v 0515 0516 class N: 0517 'Iterator missing next()' 0518 def __init__(self, seqn): 0519 self.seqn = seqn 0520 self.i = 0 0521 def __iter__(self): 0522 return self 0523 0524 class E: 0525 'Test propagation of exceptions' 0526 def __init__(self, seqn): 0527 self.seqn = seqn 0528 self.i = 0 0529 def __iter__(self): 0530 return self 0531 def next(self): 0532 3 // 0 0533 0534 class S: 0535 'Test immediate stop' 0536 def __init__(self, seqn): 0537 pass 0538 def __iter__(self): 0539 return self 0540 def next(self): 0541 raise StopIteration 0542 0543 def L(seqn): 0544 'Test multiple tiers of iterators' 0545 return chain(imap(lambda x:x, R(Ig(G(seqn))))) 0546 0547 0548 class TestVariousIteratorArgs(unittest.TestCase): 0549 0550 def test_chain(self): 0551 for s in ("123", "", range(1000), ('do', 1.2), xrange(2000,2200,5)): 0552 for g in (G, I, Ig, S, L, R): 0553 self.assertEqual(list(chain(g(s))), list(g(s))) 0554 self.assertEqual(list(chain(g(s), g(s))), list(g(s))+list(g(s))) 0555 self.assertRaises(TypeError, chain, X(s)) 0556 self.assertRaises(TypeError, list, chain(N(s))) 0557 self.assertRaises(ZeroDivisionError, list, chain(E(s))) 0558 0559 def test_cycle(self): 0560 for s in ("123", "", range(1000), ('do', 1.2), xrange(2000,2200,5)): 0561 for g in (G, I, Ig, S, L, R): 0562 tgtlen = len(s) * 3 0563 expected = list(g(s))*3 0564 actual = list(islice(cycle(g(s)), tgtlen)) 0565 self.assertEqual(actual, expected) 0566 self.assertRaises(TypeError, cycle, X(s)) 0567 self.assertRaises(TypeError, list, cycle(N(s))) 0568 self.assertRaises(ZeroDivisionError, list, cycle(E(s))) 0569 0570 def test_groupby(self): 0571 for s in (range(10), range(0), range(1000), (7,11), xrange(2000,2200,5)): 0572 for g in (G, I, Ig, S, L, R): 0573 self.assertEqual([k for k, sb in groupby(g(s))], list(g(s))) 0574 self.assertRaises(TypeError, groupby, X(s)) 0575 self.assertRaises(TypeError, list, groupby(N(s))) 0576 self.assertRaises(ZeroDivisionError, list, groupby(E(s))) 0577 0578 def test_ifilter(self): 0579 for s in (range(10), range(0), range(1000), (7,11), xrange(2000,2200,5)): 0580 for g in (G, I, Ig, S, L, R): 0581 self.assertEqual(list(ifilter(isEven, g(s))), filter(isEven, g(s))) 0582 self.assertRaises(TypeError, ifilter, isEven, X(s)) 0583 self.assertRaises(TypeError, list, ifilter(isEven, N(s))) 0584 self.assertRaises(ZeroDivisionError, list, ifilter(isEven, E(s))) 0585 0586 def test_ifilterfalse(self): 0587 for s in (range(10), range(0), range(1000), (7,11), xrange(2000,2200,5)): 0588 for g in (G, I, Ig, S, L, R): 0589 self.assertEqual(list(ifilterfalse(isEven, g(s))), filter(isOdd, g(s))) 0590 self.assertRaises(TypeError, ifilterfalse, isEven, X(s)) 0591 self.assertRaises(TypeError, list, ifilterfalse(isEven, N(s))) 0592 self.assertRaises(ZeroDivisionError, list, ifilterfalse(isEven, E(s))) 0593 0594 def test_izip(self): 0595 for s in ("123", "", range(1000), ('do', 1.2), xrange(2000,2200,5)): 0596 for g in (G, I, Ig, S, L, R): 0597 self.assertEqual(list(izip(g(s))), zip(g(s))) 0598 self.assertEqual(list(izip(g(s), g(s))), zip(g(s), g(s))) 0599 self.assertRaises(TypeError, izip, X(s)) 0600 self.assertRaises(TypeError, list, izip(N(s))) 0601 self.assertRaises(ZeroDivisionError, list, izip(E(s))) 0602 0603 def test_imap(self): 0604 for s in (range(10), range(0), range(100), (7,11), xrange(20,50,5)): 0605 for g in (G, I, Ig, S, L, R): 0606 self.assertEqual(list(imap(onearg, g(s))), map(onearg, g(s))) 0607 self.assertEqual(list(imap(operator.pow, g(s), g(s))), map(operator.pow, g(s), g(s))) 0608 self.assertRaises(TypeError, imap, onearg, X(s)) 0609 self.assertRaises(TypeError, list, imap(onearg, N(s))) 0610 self.assertRaises(ZeroDivisionError, list, imap(onearg, E(s))) 0611 0612 def test_islice(self): 0613 for s in ("12345", "", range(1000), ('do', 1.2), xrange(2000,2200,5)): 0614 for g in (G, I, Ig, S, L, R): 0615 self.assertEqual(list(islice(g(s),1,None,2)), list(g(s))[1::2]) 0616 self.assertRaises(TypeError, islice, X(s), 10) 0617 self.assertRaises(TypeError, list, islice(N(s), 10)) 0618 self.assertRaises(ZeroDivisionError, list, islice(E(s), 10)) 0619 0620 def test_starmap(self): 0621 for s in (range(10), range(0), range(100), (7,11), xrange(20,50,5)): 0622 for g in (G, I, Ig, S, L, R): 0623 ss = zip(s, s) 0624 self.assertEqual(list(starmap(operator.pow, g(ss))), map(operator.pow, g(s), g(s))) 0625 self.assertRaises(TypeError, starmap, operator.pow, X(ss)) 0626 self.assertRaises(TypeError, list, starmap(operator.pow, N(ss))) 0627 self.assertRaises(ZeroDivisionError, list, starmap(operator.pow, E(ss))) 0628 0629 def test_takewhile(self): 0630 for s in (range(10), range(0), range(1000), (7,11), xrange(2000,2200,5)): 0631 for g in (G, I, Ig, S, L, R): 0632 tgt = [] 0633 for elem in g(s): 0634 if not isEven(elem): break 0635 tgt.append(elem) 0636 self.assertEqual(list(takewhile(isEven, g(s))), tgt) 0637 self.assertRaises(TypeError, takewhile, isEven, X(s)) 0638 self.assertRaises(TypeError, list, takewhile(isEven, N(s))) 0639 self.assertRaises(ZeroDivisionError, list, takewhile(isEven, E(s))) 0640 0641 def test_dropwhile(self): 0642 for s in (range(10), range(0), range(1000), (7,11), xrange(2000,2200,5)): 0643 for g in (G, I, Ig, S, L, R): 0644 tgt = [] 0645 for elem in g(s): 0646 if not tgt and isOdd(elem): continue 0647 tgt.append(elem) 0648 self.assertEqual(list(dropwhile(isOdd, g(s))), tgt) 0649 self.assertRaises(TypeError, dropwhile, isOdd, X(s)) 0650 self.assertRaises(TypeError, list, dropwhile(isOdd, N(s))) 0651 self.assertRaises(ZeroDivisionError, list, dropwhile(isOdd, E(s))) 0652 0653 def test_tee(self): 0654 for s in ("123", "", range(1000), ('do', 1.2), xrange(2000,2200,5)): 0655 for g in (G, I, Ig, S, L, R): 0656 it1, it2 = tee(g(s)) 0657 self.assertEqual(list(it1), list(g(s))) 0658 self.assertEqual(list(it2), list(g(s))) 0659 self.assertRaises(TypeError, tee, X(s)) 0660 self.assertRaises(TypeError, list, tee(N(s))[0]) 0661 self.assertRaises(ZeroDivisionError, list, tee(E(s))[0]) 0662 0663 class LengthTransparency(unittest.TestCase): 0664 0665 def test_repeat(self): 0666 self.assertEqual(len(repeat(None, 50)), 50) 0667 self.assertRaises(TypeError, len, repeat(None)) 0668 0669 class RegressionTests(unittest.TestCase): 0670 0671 def test_sf_793826(self): 0672 # Fix Armin Rigo's successful efforts to wreak havoc 0673 0674 def mutatingtuple(tuple1, f, tuple2): 0675 # this builds a tuple t which is a copy of tuple1, 0676 # then calls f(t), then mutates t to be equal to tuple2 0677 # (needs len(tuple1) == len(tuple2)). 0678 def g(value, first=[1]): 0679 if first: 0680 del first[:] 0681 f(z.next()) 0682 return value 0683 items = list(tuple2) 0684 items[1:1] = list(tuple1) 0685 gen = imap(g, items) 0686 z = izip(*[gen]*len(tuple1)) 0687 z.next() 0688 0689 def f(t): 0690 global T 0691 T = t 0692 first[:] = list(T) 0693 0694 first = [] 0695 mutatingtuple((1,2,3), f, (4,5,6)) 0696 second = list(T) 0697 self.assertEqual(first, second) 0698 0699 0700 def test_sf_950057(self): 0701 # Make sure that chain() and cycle() catch exceptions immediately 0702 # rather than when shifting between input sources 0703 0704 def gen1(): 0705 hist.append(0) 0706 yield 1 0707 hist.append(1) 0708 raise AssertionError 0709 hist.append(2) 0710 0711 def gen2(x): 0712 hist.append(3) 0713 yield 2 0714 hist.append(4) 0715 if x: 0716 raise StopIteration 0717 0718 hist = [] 0719 self.assertRaises(AssertionError, list, chain(gen1(), gen2(False))) 0720 self.assertEqual(hist, [0,1]) 0721 0722 hist = [] 0723 self.assertRaises(AssertionError, list, chain(gen1(), gen2(True))) 0724 self.assertEqual(hist, [0,1]) 0725 0726 hist = [] 0727 self.assertRaises(AssertionError, list, cycle(gen1())) 0728 self.assertEqual(hist, [0,1]) 0729 0730 libreftest = """ Doctest for examples in the library reference: libitertools.tex 0731 0732 0733 >>> amounts = [120.15, 764.05, 823.14] 0734 >>> for checknum, amount in izip(count(1200), amounts): 0735 ... print 'Check %d is for $%.2f' % (checknum, amount) 0736 ... 0737 Check 1200 is for $120.15 0738 Check 1201 is for $764.05 0739 Check 1202 is for $823.14 0740 0741 >>> import operator 0742 >>> for cube in imap(operator.pow, xrange(1,4), repeat(3)): 0743 ... print cube 0744 ... 0745 1 0746 8 0747 27 0748 0749 >>> reportlines = ['EuroPython', 'Roster', '', 'alex', '', 'laura', '', 'martin', '', 'walter', '', 'samuele'] 0750 >>> for name in islice(reportlines, 3, None, 2): 0751 ... print name.title() 0752 ... 0753 Alex 0754 Laura 0755 Martin 0756 Walter 0757 Samuele 0758 0759 >>> from operator import itemgetter 0760 >>> d = dict(a=1, b=2, c=1, d=2, e=1, f=2, g=3) 0761 >>> di = sorted(d.iteritems(), key=itemgetter(1)) 0762 >>> for k, g in groupby(di, itemgetter(1)): 0763 ... print k, map(itemgetter(0), g) 0764 ... 0765 1 ['a', 'c', 'e'] 0766 2 ['b', 'd', 'f'] 0767 3 ['g'] 0768 0769 # Find runs of consecutive numbers using groupby. The key to the solution 0770 # is differencing with a range so that consecutive numbers all appear in 0771 # same group. 0772 >>> data = [ 1, 4,5,6, 10, 15,16,17,18, 22, 25,26,27,28] 0773 >>> for k, g in groupby(enumerate(data), lambda (i,x):i-x): 0774 ... print map(operator.itemgetter(1), g) 0775 ... 0776 [1] 0777 [4, 5, 6] 0778 [10] 0779 [15, 16, 17, 18] 0780 [22] 0781 [25, 26, 27, 28] 0782 0783 >>> def take(n, seq): 0784 ... return list(islice(seq, n)) 0785 0786 >>> def enumerate(iterable): 0787 ... return izip(count(), iterable) 0788 0789 >>> def tabulate(function): 0790 ... "Return function(0), function(1), ..." 0791 ... return imap(function, count()) 0792 0793 >>> def iteritems(mapping): 0794 ... return izip(mapping.iterkeys(), mapping.itervalues()) 0795 0796 >>> def nth(iterable, n): 0797 ... "Returns the nth item" 0798 ... return list(islice(iterable, n, n+1)) 0799 0800 >>> def all(seq, pred=bool): 0801 ... "Returns True if pred(x) is True for every element in the iterable" 0802 ... for elem in ifilterfalse(pred, seq): 0803 ... return False 0804 ... return True 0805 0806 >>> def any(seq, pred=bool): 0807 ... "Returns True if pred(x) is True for at least one element in the iterable" 0808 ... for elem in ifilter(pred, seq): 0809 ... return True 0810 ... return False 0811 0812 >>> def no(seq, pred=bool): 0813 ... "Returns True if pred(x) is False for every element in the iterable" 0814 ... for elem in ifilter(pred, seq): 0815 ... return False 0816 ... return True 0817 0818 >>> def quantify(seq, pred=bool): 0819 ... "Count how many times the predicate is True in the sequence" 0820 ... return sum(imap(pred, seq)) 0821 0822 >>> def padnone(seq): 0823 ... "Returns the sequence elements and then returns None indefinitely" 0824 ... return chain(seq, repeat(None)) 0825 0826 >>> def ncycles(seq, n): 0827 ... "Returns the sequence elements n times" 0828 ... return chain(*repeat(seq, n)) 0829 0830 >>> def dotproduct(vec1, vec2): 0831 ... return sum(imap(operator.mul, vec1, vec2)) 0832 0833 >>> def flatten(listOfLists): 0834 ... return list(chain(*listOfLists)) 0835 0836 >>> def repeatfunc(func, times=None, *args): 0837 ... "Repeat calls to func with specified arguments." 0838 ... " Example: repeatfunc(random.random)" 0839 ... if times is None: 0840 ... return starmap(func, repeat(args)) 0841 ... else: 0842 ... return starmap(func, repeat(args, times)) 0843 0844 >>> def pairwise(iterable): 0845 ... "s -> (s0,s1), (s1,s2), (s2, s3), ..." 0846 ... a, b = tee(iterable) 0847 ... try: 0848 ... b.next() 0849 ... except StopIteration: 0850 ... pass 0851 ... return izip(a, b) 0852 0853 This is not part of the examples but it tests to make sure the definitions 0854 perform as purported. 0855 0856 >>> take(10, count()) 0857 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 0858 0859 >>> list(enumerate('abc')) 0860 [(0, 'a'), (1, 'b'), (2, 'c')] 0861 0862 >>> list(islice(tabulate(lambda x: 2*x), 4)) 0863 [0, 2, 4, 6] 0864 0865 >>> nth('abcde', 3) 0866 ['d'] 0867 0868 >>> all([2, 4, 6, 8], lambda x: x%2==0) 0869 True 0870 0871 >>> all([2, 3, 6, 8], lambda x: x%2==0) 0872 False 0873 0874 >>> any([2, 4, 6, 8], lambda x: x%2==0) 0875 True 0876 0877 >>> any([1, 3, 5, 9], lambda x: x%2==0,) 0878 False 0879 0880 >>> no([1, 3, 5, 9], lambda x: x%2==0) 0881 True 0882 0883 >>> no([1, 2, 5, 9], lambda x: x%2==0) 0884 False 0885 0886 >>> quantify(xrange(99), lambda x: x%2==0) 0887 50 0888 0889 >>> a = [[1, 2, 3], [4, 5, 6]] 0890 >>> flatten(a) 0891 [1, 2, 3, 4, 5, 6] 0892 0893 >>> list(repeatfunc(pow, 5, 2, 3)) 0894 [8, 8, 8, 8, 8] 0895 0896 >>> import random 0897 >>> take(5, imap(int, repeatfunc(random.random))) 0898 [0, 0, 0, 0, 0] 0899 0900 >>> list(pairwise('abcd')) 0901 [('a', 'b'), ('b', 'c'), ('c', 'd')] 0902 0903 >>> list(pairwise([])) 0904 [] 0905 0906 >>> list(pairwise('a')) 0907 [] 0908 0909 >>> list(islice(padnone('abc'), 0, 6)) 0910 ['a', 'b', 'c', None, None, None] 0911 0912 >>> list(ncycles('abc', 3)) 0913 ['a', 'b', 'c', 'a', 'b', 'c', 'a', 'b', 'c'] 0914 0915 >>> dotproduct([1,2,3], [4,5,6]) 0916 32 0917 0918 """ 0919 0920 __test__ = {'libreftest' : libreftest} 0921 0922 def test_main(verbose=None): 0923 test_classes = (TestBasicOps, TestVariousIteratorArgs, TestGC, 0924 RegressionTests, LengthTransparency) 0925 test_support.run_unittest(*test_classes) 0926 0927 # verify reference counting 0928 if verbose and hasattr(sys, "gettotalrefcount"): 0929 import gc 0930 counts = [None] * 5 0931 for i in xrange(len(counts)): 0932 test_support.run_unittest(*test_classes) 0933 gc.collect() 0934 counts[i] = sys.gettotalrefcount() 0935 print counts 0936 0937 # doctest the examples in the library reference 0938 test_support.run_doctest(sys.modules[__name__], verbose) 0939 0940 if __name__ == "__main__": 0941 test_main(verbose=True) 0942
Generated by PyXR 0.9.4