[Zope-CVS] CVS: Packages/pypes/pypes/tests - common.py:1.8
test_query.py:1.10
Casey Duncan
casey at zope.com
Wed May 12 01:10:38 EDT 2004
Update of /cvs-repository/Packages/pypes/pypes/tests
In directory cvs.zope.org:/tmp/cvs-serv1583/tests
Modified Files:
common.py test_query.py
Log Message:
Implement "in" join primitive
Basic iterResult() method implementation for Join
=== Packages/pypes/pypes/tests/common.py 1.7 => 1.8 ===
--- Packages/pypes/pypes/tests/common.py:1.7 Wed Mar 10 16:18:03 2004
+++ Packages/pypes/pypes/tests/common.py Wed May 12 01:10:36 2004
@@ -97,8 +97,8 @@
self.identity = pypes.services.identity(self.conn)
get_transaction().commit()
-def sort(lst):
- """Return a sorted copy of a list"""
- l = lst[:]
+def sort(iterable):
+ """Return a sorted list from iterable"""
+ l = list(iterable)
l.sort()
return l
=== Packages/pypes/pypes/tests/test_query.py 1.9 => 1.10 ===
--- Packages/pypes/pypes/tests/test_query.py:1.9 Wed May 5 00:06:36 2004
+++ Packages/pypes/pypes/tests/test_query.py Wed May 12 01:10:36 2004
@@ -333,12 +333,12 @@
for r, l in join:
self.assertEqual(l.big, r.big)
- def test_greater_join(self):
- from pypes.query import greater_join
+ def test_greaterjoin(self):
+ from pypes.query import greaterjoin
us = [TestClass(age=14-i) for i in range(10)]
them = [TestClass(age=i) for i in range(15)]
expr = lambda x: x.age
- joined = list(greater_join(us, expr, them, expr))
+ joined = list(greaterjoin(us, expr, them, expr))
left2right = {}
for left, right in joined:
self.failUnless(left.age > right.age)
@@ -346,40 +346,73 @@
for left, lesser in left2right.items():
self.assertEqual(len(lesser), left.age)
- def test_greater_join_empty(self):
- from pypes.query import greater_join
- joined = greater_join([], lambda x:x, [], lambda x:x)
+ def test_greaterjoin_empty(self):
+ from pypes.query import greaterjoin
+ joined = greaterjoin([], lambda x:x, [], lambda x:x)
self.failIf(list(joined))
objs = [TestClass() for i in range(5)]
- joined = greater_join(objs, lambda x:x, [], lambda x:x)
+ joined = greaterjoin(objs, lambda x:x, [], lambda x:x)
self.failIf(list(joined))
- joined = greater_join([], lambda x:x, objs, lambda x:x)
+ joined = greaterjoin([], lambda x:x, objs, lambda x:x)
self.failIf(list(joined))
- def test_greater_join_none_greater(self):
- from pypes.query import greater_join
+ def test_greaterjoin_none_greater(self):
+ from pypes.query import greaterjoin
objs = [TestClass() for i in range(5)]
- joined = greater_join(objs, lambda x: 1, objs, lambda x: 2)
+ joined = greaterjoin(objs, lambda x: 1, objs, lambda x: 2)
self.failIf(list(joined))
+ def test_injoin(self):
+ from pypes.query import injoin
+ letters = ['a', 'b', 'c', 'd']
+ words = ['a', 'big', 'fat', 'hairy', 'deal']
+ expr = lambda x: x
+ joined = list(injoin(letters, expr, words, expr))
+ self.assertEqual(len(joined), 6)
+ for left, right in joined:
+ self.failUnless(left in right)
+
+ def test_injoin_empty(self):
+ from pypes.query import injoin
+ joined = injoin([], lambda x:x, [], lambda x:x)
+ self.failIf(list(joined))
+ objs = ['' for i in range(5)]
+ joined = injoin(objs, lambda x:x, [], lambda x:x)
+ self.failIf(list(joined))
+ joined = injoin([], lambda x:x, objs, lambda x:x)
+ self.failIf(list(joined))
+
+ def test_injoin_none_match(self):
+ from pypes.query import injoin
+ objs = [TestClass() for i in range(5)]
+ joined = injoin(objs, lambda x: 1, objs, lambda x: [])
+ self.failIf(list(joined))
+
+
class TestJoin(unittest.TestCase):
+ def setUp(self):
+ from pypes.query import Join
+ self.inputs = {'a': Set(('AAA', 'BAA', 'AAC', 'AAD')),
+ 'b': Set(('A', 'B', 'D')),
+ 'c': Set (('AA', 'BB', 'CC'))}
+ self.criteria = DummyExpr(expr='a == b', names=['a', 'b'])
+ self.join = Join(self.inputs, self.criteria)
+
def test_interface(self):
from pypes.query import Join
from pypes.interfaces import IPartialResult
- inputs = {'a':Set(), 'b':Set()}
self.failUnless(verifyObject(
IPartialResult,
- Join(inputs, DummyExpr(expr='a == b', names=['a', 'b']))))
+ Join(self.inputs, DummyExpr(expr='a == b', names=['a', 'b']))))
def test_join_handles_compares(self):
from pypes.query import Join
- inputs = {'a':Set(), 'b':Set()}
- Join(inputs, DummyExpr(expr='a == b', names=['a', 'b']))
- Join(inputs, DummyExpr(expr='a < b', names=['a', 'b']))
- Join(inputs, DummyExpr(expr='a > b', names=['a', 'b']))
- Join(inputs, DummyExpr(expr='a in b', names=['a', 'b']))
- Join(inputs, DummyExpr(
+ Join(self.inputs, DummyExpr(expr='a == b', names=['a', 'b']))
+ Join(self.inputs, DummyExpr(expr='a < b', names=['a', 'b']))
+ Join(self.inputs, DummyExpr(expr='a > b', names=['a', 'b']))
+ Join(self.inputs, DummyExpr(expr='a in b', names=['a', 'b']))
+ Join(self.inputs, DummyExpr(
expr='a.__name__ == b["name"]', names=['a', 'b']))
def test_join_cant_process(self):
@@ -414,6 +447,56 @@
self.assertRaises(
CantProcess, Join, inputs,
DummyExpr(expr='a <= b', names=['a', 'b']))
+
+ def test_input_map(self):
+ self.assertEqual(self.join.inputMap(), self.inputs)
+ self.failIf(self.join.inputMap() is self.inputs)
+
+ def test_criteria(self):
+ self.failUnless(self.join.criteriaExpr() is self.criteria)
+
+ def test_names_used(self):
+ self.assertEqual(self.join.namesUsed(), Set(['a', 'b']))
+
+ def test_select_joined_inputs(self):
+ from pypes.query import Join
+ j = Join(self.inputs, DummyExpr(expr='a[-1] == b', names=['a', 'b']))
+ expected = sort([(b, a) for a in self.inputs['a']
+ for b in self.inputs['b']
+ if a[-1] == b])
+ self.assertEqual(sort(j.iterResult('b', 'a')), expected)
+ j = Join(self.inputs, DummyExpr(expr='b*2 > c', names=['b', 'c']))
+ expected = sort([(b, c) for b in self.inputs['b']
+ for c in self.inputs['c']
+ if b*2 > c])
+ self.assertEqual(sort(j.iterResult('b', 'c')), expected)
+ j = Join(self.inputs, DummyExpr(expr='b in a', names=['a', 'b']))
+ expected = sort([(a, b) for a in self.inputs['a']
+ for b in self.inputs['b']
+ if b in a])
+ self.assertEqual(sort(j.iterResult('a', 'b')), expected)
+
+ def test_select_inputs_not_joined(self):
+ from pypes.query import Join
+ j = Join(self.inputs, DummyExpr(expr='a[-1] == b', names=['a', 'b']))
+ expected = sort([(a, c) for a in self.inputs['a']
+ for b in self.inputs['b']
+ for c in self.inputs['c']
+ if a[-1] == b])
+ self.assertEqual(sort(j.iterResult('a', 'c')), expected)
+
+ def test_union_joins_one_common_input(self):
+ from pypes.query import Join
+ j1 = Join(self.inputs, DummyExpr(expr='a[-1] == b', names=['a', 'b']))
+ j2 = Join(self.inputs, DummyExpr(expr='a[:2] == c', names=['a', 'b']))
+ union = j1.union(j2)
+ self.failIf(union is j1)
+ self.failIf(union is j2)
+ expected = sort([(a, b, c) for a in self.inputs['a']
+ for b in self.inputs['b']
+ for c in self.inputs['c']
+ if a[-1] == b or a[:2] == c])
+ self.assertEqual(sort(union), expected)
if __name__ == '__main__':
unittest.main()
More information about the Zope-CVS
mailing list