[Zope-CVS] CVS: Packages/pypes/pypes/tests - test_queryops.py:1.1
test_query.py:1.16
Casey Duncan
casey at zope.com
Thu Jul 1 23:18:57 EDT 2004
Update of /cvs-repository/Packages/pypes/pypes/tests
In directory cvs.zope.org:/tmp/cvs-serv17543/tests
Modified Files:
test_query.py
Added Files:
test_queryops.py
Log Message:
Factor out query primitives into separate queryops module
Implement query results and queries (sans sorting right now)
=== Added File Packages/pypes/pypes/tests/test_queryops.py ===
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Query primitive tests
$Id: test_queryops.py,v 1.1 2004/07/02 03:18:56 caseman Exp $"""
import unittest
from sets import Set
from operator import mul
from compiler import parse
from zope.interface import implements
from zope.interface.verify import verifyObject, verifyClass
from persistent import Persistent
from pypes.interfaces import IExpression, IQueryResult, IPartialResult
from pypes.tests.common import sort, PypesTestCase, TestClass
class DummyExpr:
implements(IExpression)
def __init__(self, func=None, names=[], expr=None):
self._func = func
self._names = Set(names)
if expr is not None:
self._ast = parse(expr, mode='eval')
if func is None:
self._func = lambda ns: eval(expr, {}, ns)
def names(self):
return self._names
def bindings(self):
return {}
def freeNames(self, free_names=[]):
return self._names
def ast(self):
return self._ast
def freeOperands(self, free_names=[]):
return []
def optimize(self, free_names=[]):
pass
def __call__(self, *args, **kw):
return self._func(*args, **kw)
def __eq__(self, other):
return (self._func.func_code == other._func.func_code
and self._names == other._names)
def __or__(self, other):
names = list(Set(self._names) & Set(other._names))
def func(*args, **kw):
return self._func(*args, **kw) or other._func(*args, **kw)
return DummyExpr(func, names)
def __and__(self, other):
names = list(Set(self._names) & Set(other._names))
def func(*args, **kw):
return self._func(*args, **kw) and other._func(*args, **kw)
return DummyExpr(func, names)
class Foo(TestClass):
pass
class Bar(TestClass):
pass
class TestPrimitives(unittest.TestCase):
def test_union_input_maps(self):
from pypes.queryops import union_input_maps
map1 = {'jerry': Set((1,2,3)), 'george': Set(('blue', 'red'))}
map2 = {'jerry': Set((3,4,5)), 'george': map1['george']}
union = union_input_maps(map1, map2)
self.assertEqual(Set(union), Set(map1))
self.assertEqual(union['jerry'], map1['jerry'].union(map2['jerry']))
self.assertEqual(union['george'], map1['george'].union(map2['george']))
self.failIf(union is map1 or union is map2)
def test_union_input_map_to_self(self):
from pypes.queryops import union_input_maps
inmap = {'pee wee': Set(('loner', 'rebel'))}
union = union_input_maps(inmap, inmap)
self.assertEqual(inmap, union)
self.failIf(inmap is union)
def test_union_input_maps_fails_with_different_inputs(self):
from pypes.queryops import union_input_maps
map1 = {'larry':Set(), 'moe':Set(), 'curly':Set()}
map2 = {'larry':Set(), 'moe':Set(), 'shemp':Set()}
self.assertRaises(AssertionError, union_input_maps, map1, map2)
def test_intersect_input_maps(self):
from pypes.queryops import intersect_input_maps
map1 = {'jerry': Set((1,2,3)), 'george': Set(('blue', 'red'))}
map2 = {'jerry': Set((3,4,5)), 'george': map1['george']}
intersect = intersect_input_maps(map1, map2)
self.assertEqual(Set(intersect), Set(map1))
self.assertEqual(
intersect['jerry'], map1['jerry'].intersection(map2['jerry']))
self.assertEqual(
intersect['george'], map1['george'].intersection(map2['george']))
self.failIf(intersect is map1 or intersect is map2)
def test_intersect_input_map_to_self(self):
from pypes.queryops import intersect_input_maps
inmap = {'pee wee': Set(('loner', 'rebel'))}
intersect = intersect_input_maps(inmap, inmap)
self.assertEqual(inmap, intersect)
self.failIf(inmap is intersect)
def test_intersect_input_maps_fails_with_different_inputs(self):
from pypes.queryops import intersect_input_maps
map1 = {'larry':Set(), 'moe':Set(), 'curly':Set()}
map2 = {'larry':Set(), 'moe':Set(), 'shemp':Set()}
self.assertRaises(AssertionError, intersect_input_maps, map1, map2)
class TestCartProduct(unittest.TestCase):
def setUp(self):
from pypes.queryops import CartProduct
self.foos = Set([Foo(foo=i) for i in range(10)])
self.bars = Set([Bar(bar=chr(i+65)) for i in range(26)])
self.inputs = {'foo':self.foos, 'bar':self.bars}
self.cp = CartProduct(self.inputs)
def test_interface(self):
from pypes.interfaces import IPartialResult
self.failUnless(verifyObject(IPartialResult, self.cp))
def test_input_map(self):
self.assertEqual(self.cp.inputMap(), self.inputs)
self.failIf(self.cp.inputMap() is self.inputs)
def test_criteria(self):
from pypes.queryops import CartProduct
self.assertEqual(self.cp.criteriaExpr(), None)
criteria = DummyExpr(None)
cp = CartProduct(self.inputs, criteria)
self.failUnless(cp.criteriaExpr() is criteria)
def test_names_used(self):
from pypes.queryops import CartProduct
self.failIf(self.cp.namesUsed())
cp = CartProduct(
self.inputs, DummyExpr(None, ['kirk', 'spock', 'mccoy']))
self.assertEqual(cp.namesUsed(), Set(['kirk', 'spock', 'mccoy']))
def test_magnitude_all_inputs(self):
expected = reduce(mul, [len(ipt) for ipt in self.inputs.values()])
self.assertEqual(self.cp.magnitude(), expected)
self.assertEqual(self.cp.magnitude('foo', 'bar'), expected)
def test_magnitude_one_input(self):
self.assertEqual(self.cp.magnitude('bar'), len(self.inputs['bar']))
self.assertEqual(self.cp.magnitude('foo'), len(self.inputs['foo']))
def test_iter_one_no_criteria(self):
result = list(self.cp.iterResult('foo'))
self.assertEqual(len(result), len(self.foos) * len(self.bars))
expected = Set(self.foos)
result = Set(result)
self.assertEqual(result, expected)
def test_iter_two_with_criteria(self):
from pypes.queryops import CartProduct
cp = CartProduct(self.inputs,
lambda ipt: ipt['foo'].foo < 5 and ipt['bar'].bar in ('A', 'B'))
count = 0
for foo, bar in cp.iterResult('foo', 'bar'):
self.failUnless(foo.foo < 5 and bar.bar in ('A', 'B'))
count += 1
self.assertEqual(count, len(self.foos))
def test_iter_three_with_fixed(self):
from pypes.queryops import CartProduct
inputs = {'one': [1,2,3,4,5,6],
'two': list('ABCDEFGHIJKLMNOP'),
'three': [2,4,6,8,10]}
cp = CartProduct(inputs)
count = 0
for one, two, three in cp.iterResult('one', 'two', 'three', two='C'):
self.assertEqual(two, 'C')
count += 1
self.assertEqual(count, len(inputs['one']) * len(inputs['three']))
count = 0
for two, three, one in cp.iterResult(
'two', 'three', 'one', two='C', one=2):
self.assertEqual(two, 'C')
self.assertEqual(one, 2)
count += 1
self.assertEqual(count, len(inputs['three']))
def test_iter_with_fixed_fails(self):
from pypes.exceptions import QueryError
self.assertRaises(
StopIteration, self.cp.iterResult('foo', 'bar', bar=Bar()).next)
self.assertRaises(
QueryError, self.cp.iterResult('foo', 'bar', baz=None).next)
def test_union_cp_with_cp(self):
from pypes.queryops import CartProduct
cp1 = CartProduct(
self.inputs, DummyExpr(lambda ipt: ipt['foo'].foo == 3))
cp2 = CartProduct(
self.inputs, DummyExpr(lambda ipt: ipt['bar'].bar in ('A', 'B')))
union = cp1.union(cp2)
self.failIf(union is cp1)
self.failIf(union is cp2)
count = 0
for foo, bar in union.iterResult('foo', 'bar'):
self.failUnless(foo.foo == 3 or bar.bar in ('A', 'B'))
count += 1
self.assertEqual(count, 44)
def test_intersect_cp_with_cp(self):
from pypes.queryops import CartProduct
cp1 = CartProduct(
self.inputs, DummyExpr(lambda ipt: ipt['foo'].foo < 5))
cp2 = CartProduct(
self.inputs, DummyExpr(lambda ipt: ipt['bar'].bar in ('A', 'B')))
sect = cp1.intersection(cp2)
self.failIf(sect is cp1)
self.failIf(sect is cp2)
count = 0
for foo, bar in sect.iterResult('foo', 'bar'):
self.failUnless(foo.foo < 5 and bar.bar in ('A', 'B'))
count += 1
self.assertEqual(count, len(self.foos))
def test_result_set_no_criteria(self):
self.assertEqual(self.cp.resultSet('foo'), self.inputs['foo'])
self.assertEqual(self.cp.resultSet('bar'), self.inputs['bar'])
def test_result_set_criteria_tests_one_input(self):
from pypes.queryops import CartProduct
cp = CartProduct(
self.inputs, DummyExpr(lambda ipt: ipt['foo'].foo == 3,
names=['foo']))
self.assertEqual(list(cp.resultSet('foo')),
[foo for foo in self.inputs['foo'] if foo.foo == 3])
self.assertEqual(cp.resultSet('bar'), self.inputs['bar'])
cp = CartProduct(
self.inputs, DummyExpr(lambda ipt: ipt['foo'].foo == 99,
names=['foo']))
self.failIf(cp.resultSet('foo'))
self.failIf(cp.resultSet('bar'))
def test_result_set_criteria_tests_all_inputs(self):
from pypes.queryops import CartProduct
cp = CartProduct(
self.inputs, DummyExpr(lambda ipt: ipt['foo'].foo > 5
and ipt['bar'].bar in ('B', 'Y'),
names=['foo', 'bar']))
expected = Set([foo for foo in self.inputs['foo'] if foo.foo > 5])
self.assertEqual(cp.resultSet('foo'), expected)
expected = Set([bar for bar in self.inputs['bar']
if bar.bar in ('B','Y')])
self.assertEqual(cp.resultSet('bar'), expected)
def test_result_set_criteria_matches_all(self):
from pypes.queryops import CartProduct
cp = CartProduct(
self.inputs, DummyExpr(lambda ipt: ipt['foo'].foo < 50
and ipt['bar'].bar,
names=['foo', 'bar']))
self.assertEqual(cp.resultSet('foo'), self.inputs['foo'])
self.assertEqual(cp.resultSet('bar'), self.inputs['bar'])
def test_result_set_bogus_input(self):
from pypes.exceptions import QueryError
self.assertRaises(QueryError, self.cp.resultSet, 'cartouche')
class TestSort(unittest.TestCase):
def setUp(self):
self.ordered = [TestClass(foo=i) for i in range(10)]
scrambled = []
for i in range(5):
scrambled.insert(-1, self.ordered[i])
scrambled.insert(1, self.ordered[9-i])
self.scrambled = scrambled
def test_sort_simple(self):
from pypes.queryops import sort
sorted = list(sort(self.scrambled, lambda x: x.foo))
self.assertEqual(sorted, self.ordered)
def test_sort_descending(self):
from pypes.queryops import sort
sorted = list(sort(self.scrambled, lambda x: x.foo, 'descending'))
self.ordered.reverse()
self.assertEqual(sorted, self.ordered)
def test_sort_limit_ascending(self):
from pypes.queryops import sort
sorted = list(sort(self.scrambled, lambda x: x.foo, limit=3))
self.assertEqual(sorted, self.ordered[:3])
def test_sort_limit_descending(self):
from pypes.queryops import sort
sorted = list(
sort(self.scrambled, lambda x: x.foo, 'descending', limit=3))
self.ordered.reverse()
self.assertEqual(sorted, self.ordered[:3])
def test_sort_empty(self):
from pypes.queryops import sort
sorted = sort([], lambda x:x)
self.failIf(sorted)
sorted = sort([], lambda x:x, 'descending')
self.failIf(sorted)
sorted = sort([], lambda x:x, limit=20)
self.failIf(sorted)
sorted = sort([], lambda x:x, 'descending', limit=5)
self.failIf(sorted)
class TestJoinPrimitives(unittest.TestCase):
def test_equijoin_one2one(self):
from pypes.queryops import equijoin
us = [TestClass(age=i+10) for i in range(15)]
them = [TestClass(age=14-i) for i in range(10)]
expr = lambda x: x.age
joined = list(equijoin(us, expr, them, expr))
self.assertEqual(len(joined), 5)
for left, right in joined:
self.assertEqual(left.age, right.age)
def test_equijoin_not_hashable(self):
from pypes.queryops import equijoin
left = ['a', 'bc', ['s','a'], []]
right = ['bc', ['s','a'], ['yo']]
joined = list(equijoin(left, lambda x: x, right, lambda x: x))
self.assertEqual(
sort(joined), sort([('bc','bc'), (['s','a'],['s','a'])]))
def test_equijoin_empty(self):
from pypes.queryops import equijoin
joined = equijoin([], lambda x:x, [], lambda x:x)
self.failIf(list(joined))
objs = [TestClass() for i in range(5)]
joined = equijoin(objs, lambda x:x, [], lambda x:x)
self.failIf(list(joined))
joined = equijoin([], lambda x:x, objs, lambda x:x)
self.failIf(list(joined))
def test_equijoin_none_match(self):
from pypes.queryops import equijoin
objs = [TestClass() for i in range(5)]
joined = equijoin(objs, lambda x: 1, objs, lambda x: -1)
self.failIf(list(joined))
def test_equijoin_one2many(self):
from sets import Set
from pypes.queryops import equijoin
colors = ['red', 'orange', 'yellow', 'green']
left = Set([TestClass(shade=c) for c in colors])
right = Set()
for i in range(4):
for c in colors[1:]:
right.union_update([TestClass(shade=c) for k in range(i)])
shade = lambda x: x.shade
joined_right = Set()
joined_left = Set()
for l, r in equijoin(left, shade, right, shade):
self.assertEqual(l.shade, r.shade)
self.assertNotEqual(l.shade, 'red')
joined_left.add(l)
joined_right.add(r)
self.assertEqual(len(joined_left), len(left) - 1)
self.assertEqual(joined_right, right)
# Test left <=> right switch optimization
for r, l in equijoin(right, shade, left, shade):
self.assertEqual(l.shade, r.shade)
self.assertNotEqual(l.shade, 'red')
joined_left.add(l)
joined_right.add(r)
self.assertEqual(len(joined_left), len(left) - 1)
self.assertEqual(joined_right, right)
def test_equijoin_many2many(self):
from pypes.queryops import equijoin
big = [TestClass(big=True) for i in range(10)]
small = [TestClass(big=False) for i in range(10)]
left = big[4:] + small[4:]
right = big[:4] + small[:4]
isbig = lambda x: x.big
join = list(equijoin(left, isbig, right, isbig))
self.assertEqual(len(join), len(left) * len(right) // 2)
for r, l in join:
self.assertEqual(l.big, r.big)
# Test left <=> right switch optimization
join = list(equijoin(right, isbig, left, isbig))
self.assertEqual(len(join), len(left) * len(right) // 2)
for r, l in join:
self.assertEqual(l.big, r.big)
def test_greaterjoin(self):
from pypes.queryops import greaterjoin
us = [TestClass(age=14-i) for i in range(10)]
them = [TestClass(age=i) for i in range(15)]
expr = lambda x: x.age
joined = list(greaterjoin(us, expr, them, expr))
left2right = {}
for left, right in joined:
self.failUnless(left.age > right.age)
left2right.setdefault(left, []).append(right)
for left, lesser in left2right.items():
self.assertEqual(len(lesser), left.age)
def test_greaterjoin_empty(self):
from pypes.queryops import greaterjoin
joined = greaterjoin([], lambda x:x, [], lambda x:x)
self.failIf(list(joined))
objs = [TestClass() for i in range(5)]
joined = greaterjoin(objs, lambda x:x, [], lambda x:x)
self.failIf(list(joined))
joined = greaterjoin([], lambda x:x, objs, lambda x:x)
self.failIf(list(joined))
def test_greaterjoin_none_greater(self):
from pypes.queryops import greaterjoin
objs = [TestClass() for i in range(5)]
joined = greaterjoin(objs, lambda x: 1, objs, lambda x: 2)
self.failIf(list(joined))
def test_injoin(self):
from pypes.queryops import injoin
letters = ['a', 'b', 'c', 'd']
words = ['a', 'big', 'fat', 'hairy', 'deal']
expr = lambda x: x
joined = list(injoin(letters, expr, words, expr))
self.assertEqual(len(joined), 6)
for left, right in joined:
self.failUnless(left in right)
def test_injoin_empty(self):
from pypes.queryops import injoin
joined = injoin([], lambda x:x, [], lambda x:x)
self.failIf(list(joined))
objs = ['' for i in range(5)]
joined = injoin(objs, lambda x:x, [], lambda x:x)
self.failIf(list(joined))
joined = injoin([], lambda x:x, objs, lambda x:x)
self.failIf(list(joined))
def test_injoin_none_match(self):
from pypes.queryops import injoin
objs = [TestClass() for i in range(5)]
joined = injoin(objs, lambda x: 1, objs, lambda x: [])
self.failIf(list(joined))
def test_injoin_not_hashable(self):
from pypes.queryops import injoin
left = ['a', ['yo'], ['s','a'], []]
right = ['bc', [['s','a'], ['yo']]]
joined = list(injoin(left, lambda x: x, right, lambda x: x))
self.assertEqual(
sort(joined), sort([(['yo'],[['s','a'], ['yo']]),
(['s','a'],[['s','a'], ['yo']])]))
class TestJoin(unittest.TestCase):
def setUp(self):
from pypes.queryops import Join
self.inputs = {'a': Set(('AAA', 'BAA', 'AAC', 'AAD')),
'b': Set(('A', 'B', 'D')),
'c': Set (('AA', 'BB', 'CC'))}
self.criteria = DummyExpr(expr='a == b', names=['a', 'b'])
self.join = Join(self.inputs, self.criteria)
def test_interface(self):
from pypes.queryops import Join
from pypes.interfaces import IPartialResult
self.failUnless(verifyObject(
IPartialResult,
Join(self.inputs, DummyExpr(expr='a == b', names=['a', 'b']))))
def test_join_handles_compares(self):
from pypes.queryops import Join
Join(self.inputs, DummyExpr(expr='a == b', names=['a', 'b']))
Join(self.inputs, DummyExpr(expr='a < b', names=['a', 'b']))
Join(self.inputs, DummyExpr(expr='a > b', names=['a', 'b']))
Join(self.inputs, DummyExpr(expr='a in b', names=['a', 'b']))
Join(self.inputs, DummyExpr(
expr='a.__name__ == b["name"]', names=['a', 'b']))
def test_join_cant_process(self):
from pypes.queryops import Join
from pypes.exceptions import CantProcess
inputs = {'a':Set(), 'b':Set(), 'c':Set()}
self.assertRaises(CantProcess, Join, inputs, DummyExpr(expr='True'))
self.assertRaises(
CantProcess, Join, inputs, DummyExpr(expr='a', names=['a']))
self.assertRaises(
CantProcess, Join, inputs, DummyExpr(expr='a == 1', names=['a']))
self.assertRaises(
CantProcess, Join, inputs, DummyExpr(expr='a == a', names=['a']))
self.assertRaises(
CantProcess, Join, inputs,
DummyExpr(expr='a + b', names=['a', 'b']))
self.assertRaises(
CantProcess, Join, inputs,
DummyExpr(expr='a + b > 0', names=['a', 'b']))
self.assertRaises(
CantProcess, Join, inputs,
DummyExpr(expr='0 < a + b', names=['a', 'b']))
self.assertRaises(
CantProcess, Join, inputs,
DummyExpr(expr='a * b == a + b', names=['a', 'b']))
self.assertRaises(
CantProcess, Join, inputs,
DummyExpr(expr='a == b == c', names=['a', 'b', 'c']))
self.assertRaises(
CantProcess, Join, inputs,
DummyExpr(expr='a >= b', names=['a', 'b']))
self.assertRaises(
CantProcess, Join, inputs,
DummyExpr(expr='a <= b', names=['a', 'b']))
def test_input_map(self):
self.assertEqual(self.join.inputMap(), self.inputs)
self.failIf(self.join.inputMap() is self.inputs)
def test_criteria(self):
self.failUnless(self.join.criteriaExpr() is self.criteria)
def test_names_used(self):
self.assertEqual(self.join.namesUsed(), Set(['a', 'b']))
def test_select_joined_inputs(self):
from pypes.queryops import Join
j = Join(self.inputs, DummyExpr(expr='a[-1] == b', names=['a', 'b']))
expected = sort([(b, a) for a in self.inputs['a']
for b in self.inputs['b']
if a[-1] == b])
self.assertEqual(sort(j.iterResult('b', 'a')), expected)
j = Join(self.inputs, DummyExpr(expr='b*2 > c', names=['b', 'c']))
expected = sort([(b, c) for b in self.inputs['b']
for c in self.inputs['c']
if b*2 > c])
self.assertEqual(sort(j.iterResult('b', 'c')), expected)
j = Join(self.inputs, DummyExpr(expr='b in a', names=['a', 'b']))
expected = sort([(a, b) for a in self.inputs['a']
for b in self.inputs['b']
if b in a])
self.assertEqual(sort(j.iterResult('a', 'b')), expected)
def test_select_inputs_not_joined(self):
from pypes.queryops import Join
j = Join(self.inputs, DummyExpr(expr='a[-1] == b', names=['a', 'b']))
expected = Set([(a, c) for a in self.inputs['a']
for b in self.inputs['b']
for c in self.inputs['c']
if a[-1] == b])
self.assertEqual(Set(j.iterResult('a', 'c')), expected)
def test_select_join_fixed_right(self):
from pypes.queryops import Join
j = Join(self.inputs, DummyExpr(expr='a[-1] == b', names=['a', 'b']))
expected = Set([a for a in self.inputs['a']
for b in self.inputs['b']
if a[-1] == b and b == 'A'])
self.assertEqual(Set(j.iterResult('a', b='A')), expected)
def test_select_join_fixed_left(self):
from pypes.queryops import Join
j = Join(self.inputs, DummyExpr(expr='a[-1] == b', names=['a', 'b']))
expected = Set([b for a in self.inputs['a']
for b in self.inputs['b']
if a[-1] == b and a == 'BAA'])
self.assertEqual(Set(j.iterResult('b', a='BAA')), expected)
def test_select_join_fixed_both(self):
from pypes.queryops import Join
j = Join(self.inputs, DummyExpr(expr='a[-1] == b', names=['a', 'b']))
expected = Set([('AAD', 'D')])
self.assertEqual(Set(j.iterResult('a', 'b', a='AAD', b='D')), expected)
def test_select_join_fixed_nomatch(self):
from pypes.queryops import Join
j = Join(self.inputs, DummyExpr(expr='a[:2] == c', names=['a', 'c']))
self.assertRaises(
StopIteration, j.iterResult('a', 'c', a='AAA', c='CC').next)
self.assertRaises(
StopIteration, j.iterResult('a', 'c', a='BCC', c='CC').next)
self.assertRaises(
StopIteration, j.iterResult('a', 'c', a='AAA', c='EE').next)
self.assertRaises(
StopIteration, j.iterResult('a', 'c', a='REE', c='EE').next)
def test_select_join_fixed_bad_input(self):
from pypes.queryops import Join
from pypes.exceptions import QueryError
j = Join(self.inputs, DummyExpr(expr='a[:2] == c', names=['a', 'c']))
self.assertRaises(
QueryError, j.iterResult, 'a', 'c', uncle='bob')
def test_union_joins_one_common_input(self):
from pypes.queryops import Join
j1 = Join(self.inputs, DummyExpr(expr='a[-1] == b', names=['a', 'b']))
j2 = Join(self.inputs, DummyExpr(expr='a[:2] == c', names=['a', 'c']))
union = j1.union(j2)
self.failIf(union is j1)
self.failIf(union is j2)
expected = sort([(a, b, c) for a in self.inputs['a']
for b in self.inputs['b']
for c in self.inputs['c']
if a[-1] == b or a[:2] == c])
self.assertEqual(sort(union.iterResult('a', 'b', 'c')), expected)
def test_left_result_set(self):
from pypes.queryops import Join
j = Join(self.inputs, DummyExpr(expr='a[-1] == b', names=['a', 'b']))
expected = Set([a for a in self.inputs['a']
for b in self.inputs['b']
if a[-1] == b])
self.assertEqual(j.resultSet('a'), expected)
def test_right_result_set(self):
from pypes.queryops import Join
j = Join(self.inputs, DummyExpr(expr='a[-1] == b', names=['a', 'b']))
expected = Set([b for a in self.inputs['a']
for b in self.inputs['b']
if a[-1] == b])
self.assertEqual(j.resultSet('b'), expected)
def test_result_set_not_in_criteria(self):
from pypes.queryops import Join
j = Join(self.inputs, DummyExpr(expr='a[-1] == b', names=['a', 'b']))
self.assertEqual(j.resultSet('c'), self.inputs['c'])
def test_empty_result_set(self):
self.failIf(self.join.resultSet('a'))
self.failIf(self.join.resultSet('b'))
self.failIf(self.join.resultSet('c'))
def test_result_set_bogus_input(self):
from pypes.exceptions import QueryError
self.assertRaises(QueryError, self.join.resultSet, 'cartouche')
class TestActualizedResult(unittest.TestCase):
def setUp(self):
from pypes.queryops import ActualizedResult
self.inputs = {'a': Set((10,20,30,40,50,60)),
'b': Set('ABCDEFGHIJKLMNOPQRSTUVWXYZ')}
self.criteria = DummyExpr(expr="a%20 == 0 and b > 'M'", names=['a','b'])
self.result = ActualizedResult(self.inputs, self.criteria)
def test_interface(self):
from pypes.interfaces import IActualizedPartialResult
self.failUnless(verifyObject(IActualizedPartialResult, self.result))
def test_can_process(self):
from pypes.queryops import ActualizedResult
ActualizedResult(self.inputs, DummyExpr(expr="a == 10", names=['a']))
ActualizedResult(self.inputs,
DummyExpr(expr="a == 10 or a == 30", names=['a']))
ActualizedResult(self.inputs,
DummyExpr(expr="a == 10 and b in ('a','b')", names=['a', 'b']))
def test_cant_process(self):
from pypes.queryops import ActualizedResult
from pypes.exceptions import CantProcess
self.assertRaises(CantProcess, ActualizedResult,
self.inputs, DummyExpr(expr="a == b", names=['a', 'b']))
self.assertRaises(CantProcess, ActualizedResult,
self.inputs, DummyExpr(expr="a.startswith(b)", names=['a', 'b']))
def test_input_map(self):
expected = {
'a': Set((20, 40, 60)),
'b': Set('NOPQRSTUVWXYZ')}
self.assertEqual(self.result.inputMap(), expected)
self.failIf(self.result.inputMap() is self.inputs)
def test_criteria(self):
self.failUnless(self.result.criteriaExpr() is self.criteria)
def test_names_used(self):
self.assertEqual(self.result.namesUsed(), Set(['a', 'b']))
def test_no_criteria(self):
from pypes.queryops import ActualizedResult
inputset = Set((1,2,3,4))
ar = ActualizedResult({'x':inputset})
self.assertEqual(Set(ar.iterResult('x')), inputset)
def test_select_one_input(self):
expected = Set([a for a in self.inputs['a'] if a%20 == 0])
self.assertEqual(Set(self.result.iterResult('a')), expected)
expected = Set([b for b in self.inputs['b'] if b > 'M'])
self.assertEqual(Set(self.result.iterResult('b')), expected)
def test_select_multiple_inputs(self):
expected = sort([(a, b) for a in self.inputs['a']
for b in self.inputs['b']
if a%20 == 0 and b > 'M'])
self.assertEqual(sort(self.result.iterResult('a', 'b')), expected)
expected = sort([(b, a) for a, b in expected])
self.assertEqual(sort(self.result.iterResult('b', 'a')), expected)
def test_empty_result(self):
from pypes.queryops import ActualizedResult
ar = ActualizedResult(
self.inputs, DummyExpr(expr="a > 100", names=['a']))
self.failIf(list(ar.iterResult('a', 'b')))
self.failIf(list(ar.iterResult('b', 'a')))
self.failIf(list(ar.iterResult('a')))
self.failIf(list(ar.iterResult('b')))
def test_select_fixed_one_input(self):
self.assertEqual(list(self.result.iterResult('a', a=40)), [40])
self.assertEqual(
Set(self.result.iterResult('b', a=40)),
self.result.inputMap()['b'])
def test_select_fixed_two_inputs(self):
self.assertEqual(
list(self.result.iterResult('a', 'b', a=20, b='Q')), [(20, 'Q')])
def test_select_fixed_nomatch(self):
self.assertRaises(
StopIteration, self.result.iterResult('a', 'b', a=30).next)
self.assertRaises(
StopIteration, self.result.iterResult('a', 'b', b='E').next)
self.assertRaises(
StopIteration,
self.result.iterResult('a', 'b', a=1000, b='Jeez').next)
def test_select_fixed_bad_input(self):
from pypes.exceptions import QueryError
self.assertRaises(
QueryError, self.result.iterResult('a', 'b', uncle='bob').next)
def test_result_set_input_in_criteria(self):
self.assertEqual(self.result.resultSet('a'), Set((20, 40, 60)))
self.assertEqual(self.result.resultSet('b'), Set('NOPQRSTUVWXYZ'))
def test_empty_result_set(self):
from pypes.queryops import ActualizedResult
ar = ActualizedResult(
self.inputs, DummyExpr(expr="a > 100", names=['a']))
self.failIf(ar.resultSet('a'))
self.failIf(ar.resultSet('b'))
def test_result_set_bogus_input(self):
from pypes.exceptions import QueryError
self.assertRaises(QueryError, self.result.resultSet, 'cleo')
def test_len(self):
expected = len([(a, b) for a in self.inputs['a']
for b in self.inputs['b']
if a%20 == 0 and b > 'M'])
self.assertEqual(len(self.result), expected)
if __name__ == '__main__':
unittest.main()
=== Packages/pypes/pypes/tests/test_query.py 1.15 => 1.16 ===
--- Packages/pypes/pypes/tests/test_query.py:1.15 Thu Jun 10 00:44:24 2004
+++ Packages/pypes/pypes/tests/test_query.py Thu Jul 1 23:18:56 2004
@@ -19,752 +19,456 @@
from sets import Set
from operator import mul
from compiler import parse
-from zope.interface import implements
-from zope.interface.verify import verifyObject
+from zope.interface import implements, directlyProvides
+from zope.interface.verify import verifyObject, verifyClass
from persistent import Persistent
-from pypes.interfaces import IExpression
+from pypes.interfaces import IOrderExpression, IQueryResult, IPartialResult
from pypes.tests.common import sort, PypesTestCase, TestClass
-
-class DummyExpr:
+from pypes.tests.test_queryops import DummyExpr
+
+
+class DummyResult:
- implements(IExpression)
+ implements(IQueryResult)
- def __init__(self, func=None, names=[], expr=None):
- self._func = func
- self._names = Set(names)
- if expr is not None:
- self._ast = parse(expr, mode='eval')
- if func is None:
- self._func = lambda ns: eval(expr, {}, ns)
+ def __init__(self, query, names, part_result):
+ self._query = query
+ self._names = names
+ self._result = part_result
+ self._evaluated = False
def names(self):
return self._names
+
+ def query(self):
+ return self._query
- def bindings(self):
- return {}
-
- def freeNames(self, free_names=[]):
- return self._names
+ def evaluated(self):
+ return self._evaluated
- def ast(self):
- return self._ast
+ def __iter__(self):
+ return self._result.iterResult(self.names)
- def freeOperands(self, free_names=[]):
- return []
+ def __len__(self):
+ self._evaluated = True
+ return len(list(iter(self)))
- def optimize(self, free_names=[]):
- pass
+ def __contains__(self, item):
+ return item in list(iter(self))
+
+
+class DummyPartResult:
- def __call__(self, *args, **kw):
- return self._func(*args, **kw)
-
- def __eq__(self, other):
- return (self._func.func_code == other._func.func_code
- and self._names == other.__names)
-
- def __or__(self, other):
- names = list(Set(self._names) & Set(other._names))
- def func(*args, **kw):
- return self._func(*args, **kw) or other._func(*args, **kw)
- return DummyExpr(func, names)
-
- def __and__(self, other):
- names = list(Set(self._names) & Set(other._names))
- def func(*args, **kw):
- return self._func(*args, **kw) and other._func(*args, **kw)
- return DummyExpr(func, names)
-
-class Foo(TestClass):
- pass
-
-class Bar(TestClass):
- pass
-
-class TestPrimitives(unittest.TestCase):
-
- def test_union_input_maps(self):
- from pypes.query import union_input_maps
- map1 = {'jerry': Set((1,2,3)), 'george': Set(('blue', 'red'))}
- map2 = {'jerry': Set((3,4,5)), 'george': map1['george']}
- union = union_input_maps(map1, map2)
- self.assertEqual(Set(union), Set(map1))
- self.assertEqual(union['jerry'], map1['jerry'].union(map2['jerry']))
- self.assertEqual(union['george'], map1['george'].union(map2['george']))
- self.failIf(union is map1 or union is map2)
-
- def test_union_input_map_to_self(self):
- from pypes.query import union_input_maps
- inmap = {'pee wee': Set(('loner', 'rebel'))}
- union = union_input_maps(inmap, inmap)
- self.assertEqual(inmap, union)
- self.failIf(inmap is union)
-
- def test_union_input_maps_fails_with_different_inputs(self):
- from pypes.query import union_input_maps
- map1 = {'larry':Set(), 'moe':Set(), 'curly':Set()}
- map2 = {'larry':Set(), 'moe':Set(), 'shemp':Set()}
- self.assertRaises(AssertionError, union_input_maps, map1, map2)
-
- def test_intersect_input_maps(self):
- from pypes.query import intersect_input_maps
- map1 = {'jerry': Set((1,2,3)), 'george': Set(('blue', 'red'))}
- map2 = {'jerry': Set((3,4,5)), 'george': map1['george']}
- intersect = intersect_input_maps(map1, map2)
- self.assertEqual(Set(intersect), Set(map1))
- self.assertEqual(
- intersect['jerry'], map1['jerry'].intersection(map2['jerry']))
- self.assertEqual(
- intersect['george'], map1['george'].intersection(map2['george']))
- self.failIf(intersect is map1 or intersect is map2)
-
- def test_intersect_input_map_to_self(self):
- from pypes.query import intersect_input_maps
- inmap = {'pee wee': Set(('loner', 'rebel'))}
- intersect = intersect_input_maps(inmap, inmap)
- self.assertEqual(inmap, intersect)
- self.failIf(inmap is intersect)
-
- def test_intersect_input_maps_fails_with_different_inputs(self):
- from pypes.query import intersect_input_maps
- map1 = {'larry':Set(), 'moe':Set(), 'curly':Set()}
- map2 = {'larry':Set(), 'moe':Set(), 'shemp':Set()}
- self.assertRaises(AssertionError, intersect_input_maps, map1, map2)
+ implements(IPartialResult)
-class TestCartProduct(unittest.TestCase):
+ def __init__(self, input_map, criteria):
+ self._input_map = input_map
+ self._criteria = criteria
- def setUp(self):
- from pypes.query import CartProduct
- self.foos = Set([Foo(foo=i) for i in range(10)])
- self.bars = Set([Bar(bar=chr(i+65)) for i in range(26)])
- self.inputs = {'foo':self.foos, 'bar':self.bars}
- self.cp = CartProduct(self.inputs)
-
- def test_interface(self):
- from pypes.interfaces import IPartialResult
- self.failUnless(verifyObject(IPartialResult, self.cp))
-
- def test_input_map(self):
- self.assertEqual(self.cp.inputMap(), self.inputs)
- self.failIf(self.cp.inputMap() is self.inputs)
+ def inputMap(self):
+ return self._input_map
- def test_criteria(self):
- from pypes.query import CartProduct
- self.assertEqual(self.cp.criteriaExpr(), None)
- criteria = DummyExpr(None)
- cp = CartProduct(self.inputs, criteria)
- self.failUnless(cp.criteriaExpr() is criteria)
+ def criteriaExpr(self):
+ return self._criteria
- def test_names_used(self):
- from pypes.query import CartProduct
- self.failIf(self.cp.namesUsed())
- cp = CartProduct(
- self.inputs, DummyExpr(None, ['kirk', 'spock', 'mccoy']))
- self.assertEqual(cp.namesUsed(), Set(['kirk', 'spock', 'mccoy']))
+ def namesUsed(self):
+ return self._criteria.freeNames(self._input_maps.keys())
- def test_magnitude_all_inputs(self):
- expected = reduce(mul, [len(ipt) for ipt in self.inputs.values()])
- self.assertEqual(self.cp.magnitude(), expected)
- self.assertEqual(self.cp.magnitude('foo', 'bar'), expected)
+ def union(self, other):
+ return DummyPartResult(
+ self._input_map, self._criteria | other._criteria)
- def test_magnitude_one_input(self):
- self.assertEqual(self.cp.magnitude('bar'), len(self.inputs['bar']))
- self.assertEqual(self.cp.magnitude('foo'), len(self.inputs['foo']))
-
- def test_iter_one_no_criteria(self):
- result = list(self.cp.iterResult('foo'))
- self.assertEqual(len(result), len(self.foos) * len(self.bars))
- expected = Set(self.foos)
- result = Set(result)
- self.assertEqual(result, expected)
-
- def test_iter_two_with_criteria(self):
- from pypes.query import CartProduct
- cp = CartProduct(self.inputs,
- lambda ipt: ipt['foo'].foo < 5 and ipt['bar'].bar in ('A', 'B'))
- count = 0
- for foo, bar in cp.iterResult('foo', 'bar'):
- self.failUnless(foo.foo < 5 and bar.bar in ('A', 'B'))
- count += 1
- self.assertEqual(count, len(self.foos))
-
- def test_iter_three_with_fixed(self):
- from pypes.query import CartProduct
- inputs = {'one': [1,2,3,4,5,6],
- 'two': list('ABCDEFGHIJKLMNOP'),
- 'three': [2,4,6,8,10]}
- cp = CartProduct(inputs)
- count = 0
- for one, two, three in cp.iterResult('one', 'two', 'three', two='C'):
- self.assertEqual(two, 'C')
- count += 1
- self.assertEqual(count, len(inputs['one']) * len(inputs['three']))
- count = 0
- for two, three, one in cp.iterResult(
- 'two', 'three', 'one', two='C', one=2):
- self.assertEqual(two, 'C')
- self.assertEqual(one, 2)
- count += 1
- self.assertEqual(count, len(inputs['three']))
-
- def test_iter_with_fixed_fails(self):
- from pypes.exceptions import PypesQueryError
- self.assertRaises(
- StopIteration, self.cp.iterResult('foo', 'bar', bar=Bar()).next)
- self.assertRaises(
- PypesQueryError, self.cp.iterResult('foo', 'bar', baz=None).next)
-
- def test_union_cp_with_cp(self):
- from pypes.query import CartProduct
- cp1 = CartProduct(
- self.inputs, DummyExpr(lambda ipt: ipt['foo'].foo == 3))
- cp2 = CartProduct(
- self.inputs, DummyExpr(lambda ipt: ipt['bar'].bar in ('A', 'B')))
- union = cp1.union(cp2)
- self.failIf(union is cp1)
- self.failIf(union is cp2)
- count = 0
- for foo, bar in union.iterResult('foo', 'bar'):
- self.failUnless(foo.foo == 3 or bar.bar in ('A', 'B'))
- count += 1
- self.assertEqual(count, 44)
-
- def test_intersect_cp_with_cp(self):
- from pypes.query import CartProduct
- cp1 = CartProduct(
- self.inputs, DummyExpr(lambda ipt: ipt['foo'].foo < 5))
- cp2 = CartProduct(
- self.inputs, DummyExpr(lambda ipt: ipt['bar'].bar in ('A', 'B')))
- sect = cp1.intersection(cp2)
- self.failIf(sect is cp1)
- self.failIf(sect is cp2)
- count = 0
- for foo, bar in sect.iterResult('foo', 'bar'):
- self.failUnless(foo.foo < 5 and bar.bar in ('A', 'B'))
- count += 1
- self.assertEqual(count, len(self.foos))
-
- def test_result_set_no_criteria(self):
- self.assertEqual(self.cp.resultSet('foo'), self.inputs['foo'])
- self.assertEqual(self.cp.resultSet('bar'), self.inputs['bar'])
-
- def test_result_set_criteria_tests_one_input(self):
- from pypes.query import CartProduct
- cp = CartProduct(
- self.inputs, DummyExpr(lambda ipt: ipt['foo'].foo == 3,
- names=['foo']))
- self.assertEqual(list(cp.resultSet('foo')),
- [foo for foo in self.inputs['foo'] if foo.foo == 3])
- self.assertEqual(cp.resultSet('bar'), self.inputs['bar'])
- cp = CartProduct(
- self.inputs, DummyExpr(lambda ipt: ipt['foo'].foo == 99,
- names=['foo']))
- self.failIf(cp.resultSet('foo'))
- self.failIf(cp.resultSet('bar'))
+ def intersection(self, other):
+ return DummyPartResult(
+ self._input_map, self._criteria & other._criteria)
- def test_result_set_criteria_tests_all_inputs(self):
+ def iterResult(self, *names, **fixed):
+ # XXX This is lame...
from pypes.query import CartProduct
- cp = CartProduct(
- self.inputs, DummyExpr(lambda ipt: ipt['foo'].foo > 5
- and ipt['bar'].bar in ('B', 'Y'),
- names=['foo', 'bar']))
- expected = Set([foo for foo in self.inputs['foo'] if foo.foo > 5])
- self.assertEqual(cp.resultSet('foo'), expected)
- expected = Set([bar for bar in self.inputs['bar']
- if bar.bar in ('B','Y')])
- self.assertEqual(cp.resultSet('bar'), expected)
-
- def test_result_set_criteria_matches_all(self):
- from pypes.query import CartProduct
- cp = CartProduct(
- self.inputs, DummyExpr(lambda ipt: ipt['foo'].foo < 50
- and ipt['bar'].bar,
- names=['foo', 'bar']))
- self.assertEqual(cp.resultSet('foo'), self.inputs['foo'])
- self.assertEqual(cp.resultSet('bar'), self.inputs['bar'])
+ return CartProduct(self._input_map, self._criteria).iterResult(
+ *names, **fixed)
- def test_result_set_bogus_input(self):
- from pypes.exceptions import PypesQueryError
- self.assertRaises(PypesQueryError, self.cp.resultSet, 'cartouche')
-
-class TestSort(unittest.TestCase):
+ def resultSet(self, name):
+ if self._criteria is None:
+ return self._input_map[name]
+ else:
+ return Set(self.iterResult(name))
+
+
+class QueryTestBase(unittest.TestCase):
+
+ inputs = {'widdy': Set('1234567890'),
+ 'fiddy': Set((2,4,6,8)),
+ 'diddy': Set('ABCDEFGHI')}
+ criteria = None
+ order = None
+ limit = None
def setUp(self):
- self.ordered = [TestClass(foo=i) for i in range(10)]
- scrambled = []
- for i in range(5):
- scrambled.insert(-1, self.ordered[i])
- scrambled.insert(1, self.ordered[9-i])
- self.scrambled = scrambled
-
- def test_sort_simple(self):
- from pypes.query import sort
- sorted = list(sort(self.scrambled, lambda x: x.foo))
- self.assertEqual(sorted, self.ordered)
-
- def test_sort_descending(self):
- from pypes.query import sort, descending
- sorted = list(sort(self.scrambled, lambda x: x.foo, descending))
- self.ordered.reverse()
- self.assertEqual(sorted, self.ordered)
-
- def test_sort_limit_ascending(self):
- from pypes.query import sort
- sorted = list(sort(self.scrambled, lambda x: x.foo, limit=3))
- self.assertEqual(sorted, self.ordered[:3])
-
- def test_sort_limit_descending(self):
- from pypes.query import sort, descending
- sorted = list(
- sort(self.scrambled, lambda x: x.foo, descending, limit=3))
- self.ordered.reverse()
- self.assertEqual(sorted, self.ordered[:3])
+ self.query = self._makeQuery(
+ self.inputs, self.criteria, self.order, self.limit)
- def test_sort_empty(self):
- from pypes.query import sort, descending
- sorted = sort([], lambda x:x)
- self.failIf(sorted)
- sorted = sort([], lambda x:x, descending)
- self.failIf(sorted)
- sorted = sort([], lambda x:x, limit=20)
- self.failIf(sorted)
- sorted = sort([], lambda x:x, descending, limit=5)
- self.failIf(sorted)
-
-class TestJoinPrimitives(unittest.TestCase):
-
- def test_equijoin_one2one(self):
- from pypes.query import equijoin
- us = [TestClass(age=i+10) for i in range(15)]
- them = [TestClass(age=14-i) for i in range(10)]
- expr = lambda x: x.age
- joined = list(equijoin(us, expr, them, expr))
- self.assertEqual(len(joined), 5)
- for left, right in joined:
- self.assertEqual(left.age, right.age)
-
- def test_equijoin_not_hashable(self):
- from pypes.query import equijoin
- left = ['a', 'bc', ['s','a'], []]
- right = ['bc', ['s','a'], ['yo']]
- joined = list(equijoin(left, lambda x: x, right, lambda x: x))
+ def _makeQuery(self, inputs=None, criteria=None, order=None, limit=None):
+ # Create test query instance
+ from pypes.query import Query
+ query = Query(inputs, criteria, order, limit)
+ query._CartProduct = DummyPartResult
+ query._Join = DummyPartResult
+ query._ActualizedResult = DummyPartResult
+ query._QueryResult = DummyResult
+ return query
+
+ def _testCriteria(self, **kw):
+ if self.criteria is None:
+ return True
+ else:
+ return self.criteria(kw)
+
+class TestBasicQueries(QueryTestBase):
+
+ def test_multiple_named_inputs(self):
+ i1, i2 = TestClass(), TestClass()
+ i1.__name__ = 'eenie'
+ i2.__name__ = 'meenie'
+ q = self._makeQuery((i1, i2))
+ self.assertEqual(sort(q.inputs.keys()), ['eenie', 'meenie'])
+
+ def test_single_named_input(self):
+ from pypes.interfaces import ISet
+ ipt = TestClass()
+ ipt.__name__ = 'greedo'
+ directlyProvides(ipt, ISet)
+ q = self._makeQuery(ipt)
+ self.assertEqual(q.inputs.keys(), ['greedo'])
+
+ def test_aliased_inputs(self):
+ from pypes.query import Query
+ ipt = TestClass()
+ ipt.__name__ = 'ignored'
+ q = self._makeQuery({'frick':Set(), 'frack':ipt})
+ self.assertEqual(sort(q.inputs.keys()), ['frack', 'frick'])
+
+ def test_unamed_inputs(self):
+ from pypes.query import Query
+ from pypes.exceptions import QueryInputError
+ named = TestClass()
+ named.__name__ = 'haha'
+ self.assertRaises(QueryInputError, Query, Set())
+ self.assertRaises(QueryInputError, Query, (named, Set()))
+ self.assertRaises(QueryInputError, Query, (Set(), Set(), Set()))
+
+ def test_length_limit_bogus_value(self):
+ from pypes.query import Query
+ from pypes.exceptions import QueryInputError
+ self.assertRaises(QueryInputError,
+ Query, {'foo':Set()}, order=DummyExpr(), limit=0)
+ self.assertRaises(QueryInputError,
+ Query, {'foo':Set()}, order=DummyExpr(), limit=-1)
+ self.assertRaises(QueryInputError,
+ Query, {'foo':Set()}, order=DummyExpr(), limit='blah')
+
+class TestQueryNoCriteria(QueryTestBase):
+
+ def test_interfaces(self):
+ from pypes.query import Query
+ from pypes.interfaces import IQuery, IBasicQueryFactory
+ #self.failUnless(verifyClass(IBasicQueryFactory, Query))
+ self.failUnless(verifyObject(IQuery, self.query))
+
+ def test_inputs(self):
+ self.assertEqual(self.query.inputs, self.inputs)
+
+ def test_criteria_expr(self):
+ self.assertEqual(self.query.criteria, self.criteria)
+
+ def test_order_exprs(self):
+ if self.order is None or isinstance(self.order, tuple):
+ expected = self.order
+ else:
+ expected = (self.order,)
+ self.assertEqual(self.query.order, expected)
+
+ def test_length_limit_(self):
+ self.assertEqual(self.query.limit, self.limit)
+
+ def test_select_returns_IQueryResult(self):
+ from pypes.interfaces import IQueryResult
+ self.failUnless(IQueryResult.isImplementedBy(self.query.select()))
+
+ def test_select_without_args_outputs_all(self):
+ result = self.query.select()
+ self.assertEqual(sort(result.names()), sort(self.inputs.keys()))
+
+ def test_select_single_output(self):
+ expected = Set(self.inputs['diddy'])
+ self.assertEqual(Set(self.query.select('diddy')), expected)
+
+ expected = Set([f for w in self.inputs['widdy']
+ for f in self.inputs['fiddy']
+ if self._testCriteria(widdy=w, fiddy=f)])
+ self.assertEqual(Set(self.query.select('fiddy')), expected)
+ expected = Set([w for w in self.inputs['widdy']
+ for f in self.inputs['fiddy']
+ if self._testCriteria(widdy=w, fiddy=f)])
+ self.assertEqual(Set(self.query.select('widdy')), expected)
+
+ def test_select_two_outputs(self):
+ expected = Set([(w, f) for w in self.inputs['widdy']
+ for f in self.inputs['fiddy']
+ if self._testCriteria(widdy=w, fiddy=f)])
+ self.assertEqual(Set(self.query.select('widdy', 'fiddy')), expected)
+
+ def test_select_all_outputs(self):
+ expected = sort([(w, f, d) for w in self.inputs['widdy']
+ for f in self.inputs['fiddy']
+ for d in self.inputs['diddy']
+ if self._testCriteria(
+ widdy=w, fiddy=f, diddy=d)])
+ self.assertEqual(sort(self.query.select('widdy', 'fiddy', 'diddy')),
+ expected)
+
+ def test_select_bogus_inputs(self):
+ from pypes.exceptions import QueryError
+ self.assertRaises(QueryError, self.query.select, 'biddy')
+ self.assertRaises(
+ QueryError, self.query.select, 'fiddy', 'biddy')
+ self.assertRaises(
+ QueryError, self.query.select, 'fiddy', 'fiddy')
+
+ def test_select_one_multiple_results(self):
+ from pypes.exceptions import MultipleResultsError
+ self.assertRaises(MultipleResultsError, self.query.selectOne)
+ self.assertRaises(MultipleResultsError, self.query.selectOne, 'fiddy')
+ self.assertRaises(MultipleResultsError, self.query.selectOne, 'diddy')
+ self.assertRaises(MultipleResultsError, self.query.selectOne, 'widdy')
+ self.assertRaises(MultipleResultsError,
+ self.query.selectOne, 'widdy', 'diddy')
+ self.assertRaises(MultipleResultsError,
+ self.query.selectOne, 'widdy', 'fiddy', 'diddy')
+
+ def test_select_one_bogus_inputs(self):
+ from pypes.exceptions import QueryError
+ self.assertRaises(QueryError, self.query.selectOne, 'yiddy')
+ self.assertRaises(
+ QueryError, self.query.selectOne, 'widdy', 'yiddy')
+
+ def test_result_set(self):
+ all = [(w, f, d) for w in self.inputs['widdy']
+ for f in self.inputs['fiddy']
+ for d in self.inputs['diddy']
+ if self._testCriteria(widdy=w, fiddy=f, diddy=d)]
self.assertEqual(
- sort(joined), sort([('bc','bc'), (['s','a'],['s','a'])]))
- def test_equijoin_empty(self):
- from pypes.query import equijoin
- joined = equijoin([], lambda x:x, [], lambda x:x)
- self.failIf(list(joined))
- objs = [TestClass() for i in range(5)]
- joined = equijoin(objs, lambda x:x, [], lambda x:x)
- self.failIf(list(joined))
- joined = equijoin([], lambda x:x, objs, lambda x:x)
- self.failIf(list(joined))
-
- def test_equijoin_none_match(self):
- from pypes.query import equijoin
- objs = [TestClass() for i in range(5)]
- joined = equijoin(objs, lambda x: 1, objs, lambda x: -1)
- self.failIf(list(joined))
-
- def test_equijoin_one2many(self):
- from sets import Set
- from pypes.query import equijoin
- colors = ['red', 'orange', 'yellow', 'green']
- left = Set([TestClass(shade=c) for c in colors])
- right = Set()
- for i in range(4):
- for c in colors[1:]:
- right.union_update([TestClass(shade=c) for k in range(i)])
- shade = lambda x: x.shade
- joined_right = Set()
- joined_left = Set()
- for l, r in equijoin(left, shade, right, shade):
- self.assertEqual(l.shade, r.shade)
- self.assertNotEqual(l.shade, 'red')
- joined_left.add(l)
- joined_right.add(r)
- self.assertEqual(len(joined_left), len(left) - 1)
- self.assertEqual(joined_right, right)
- # Test left <=> right switch optimization
- for r, l in equijoin(right, shade, left, shade):
- self.assertEqual(l.shade, r.shade)
- self.assertNotEqual(l.shade, 'red')
- joined_left.add(l)
- joined_right.add(r)
- self.assertEqual(len(joined_left), len(left) - 1)
- self.assertEqual(joined_right, right)
-
- def test_equijoin_many2many(self):
- from pypes.query import equijoin
- big = [TestClass(big=True) for i in range(10)]
- small = [TestClass(big=False) for i in range(10)]
- left = big[4:] + small[4:]
- right = big[:4] + small[:4]
- isbig = lambda x: x.big
- join = list(equijoin(left, isbig, right, isbig))
- self.assertEqual(len(join), len(left) * len(right) // 2)
- for r, l in join:
- self.assertEqual(l.big, r.big)
- # Test left <=> right switch optimization
- join = list(equijoin(right, isbig, left, isbig))
- self.assertEqual(len(join), len(left) * len(right) // 2)
- for r, l in join:
- self.assertEqual(l.big, r.big)
-
- def test_greaterjoin(self):
- from pypes.query import greaterjoin
- us = [TestClass(age=14-i) for i in range(10)]
- them = [TestClass(age=i) for i in range(15)]
- expr = lambda x: x.age
- joined = list(greaterjoin(us, expr, them, expr))
- left2right = {}
- for left, right in joined:
- self.failUnless(left.age > right.age)
- left2right.setdefault(left, []).append(right)
- for left, lesser in left2right.items():
- self.assertEqual(len(lesser), left.age)
-
- def test_greaterjoin_empty(self):
- from pypes.query import greaterjoin
- joined = greaterjoin([], lambda x:x, [], lambda x:x)
- self.failIf(list(joined))
- objs = [TestClass() for i in range(5)]
- joined = greaterjoin(objs, lambda x:x, [], lambda x:x)
- self.failIf(list(joined))
- joined = greaterjoin([], lambda x:x, objs, lambda x:x)
- self.failIf(list(joined))
-
- def test_greaterjoin_none_greater(self):
- from pypes.query import greaterjoin
- objs = [TestClass() for i in range(5)]
- joined = greaterjoin(objs, lambda x: 1, objs, lambda x: 2)
- self.failIf(list(joined))
-
- def test_injoin(self):
- from pypes.query import injoin
- letters = ['a', 'b', 'c', 'd']
- words = ['a', 'big', 'fat', 'hairy', 'deal']
- expr = lambda x: x
- joined = list(injoin(letters, expr, words, expr))
- self.assertEqual(len(joined), 6)
- for left, right in joined:
- self.failUnless(left in right)
-
- def test_injoin_empty(self):
- from pypes.query import injoin
- joined = injoin([], lambda x:x, [], lambda x:x)
- self.failIf(list(joined))
- objs = ['' for i in range(5)]
- joined = injoin(objs, lambda x:x, [], lambda x:x)
- self.failIf(list(joined))
- joined = injoin([], lambda x:x, objs, lambda x:x)
- self.failIf(list(joined))
-
- def test_injoin_none_match(self):
- from pypes.query import injoin
- objs = [TestClass() for i in range(5)]
- joined = injoin(objs, lambda x: 1, objs, lambda x: [])
- self.failIf(list(joined))
-
- def test_injoin_not_hashable(self):
- from pypes.query import injoin
- left = ['a', ['yo'], ['s','a'], []]
- right = ['bc', [['s','a'], ['yo']]]
- joined = list(injoin(left, lambda x: x, right, lambda x: x))
+ self.query.resultSet('widdy'), Set(map(lambda x:x[0], all)))
self.assertEqual(
- sort(joined), sort([(['yo'],[['s','a'], ['yo']]),
- (['s','a'],[['s','a'], ['yo']])]))
+ self.query.resultSet('fiddy'), Set(map(lambda x:x[1], all)))
+ self.assertEqual(
+ self.query.resultSet('diddy'), Set(map(lambda x:x[2], all)))
+ def test_result_set_bogus_input(self):
+ from pypes.exceptions import QueryError
+ self.assertRaises(QueryError, self.query.resultSet, 'qiddy')
-class TestJoin(unittest.TestCase):
+ def test_result_set_no_criteria(self):
+ from pypes.query import Query
+ q = self._makeQuery(self.inputs)
+ self.failUnless(q.resultSet('widdy') is self.inputs['widdy'])
+ self.failUnless(q.resultSet('fiddy') is self.inputs['fiddy'])
+ self.failUnless(q.resultSet('diddy') is self.inputs['diddy'])
- def setUp(self):
- from pypes.query import Join
- self.inputs = {'a': Set(('AAA', 'BAA', 'AAC', 'AAD')),
- 'b': Set(('A', 'B', 'D')),
- 'c': Set (('AA', 'BB', 'CC'))}
- self.criteria = DummyExpr(expr='a == b', names=['a', 'b'])
- self.join = Join(self.inputs, self.criteria)
+ def test_select_one_no_results(self):
+ from pypes.exceptions import NoResultsError
+ from pypes.query import Query
+ criteria = DummyExpr(
+ expr='widdy == diddy', names=['widdy', 'diddy'])
+ q = self._makeQuery(self.inputs, criteria, self.order, self.limit)
+ self.assertRaises(NoResultsError, q.selectOne, 'widdy', 'diddy')
+ self.assertRaises(NoResultsError, q.selectOne, 'diddy', 'widdy')
+ self.assertRaises(NoResultsError, q.selectOne, 'widdy')
+ self.assertRaises(NoResultsError, q.selectOne, 'diddy')
+ self.assertRaises(NoResultsError, q.selectOne)
+
+ def test_select_one(self):
+ from pypes.query import Query
+ criteria=DummyExpr(
+ expr="int(widdy) == fiddy == 6 and diddy == 'H'",
+ names=['widdy', 'fiddy', 'diddy'])
+ q = self._makeQuery(self.inputs, criteria, self.order, self.limit)
+ self.assertEqual(q.selectOne('diddy'), 'H')
+ self.assertEqual(q.selectOne('fiddy'), 6)
+ self.assertEqual(q.selectOne('widdy'), '6')
+ self.assertEqual(q.selectOne('fiddy', 'widdy'), (6, '6'))
+ self.assertEqual(q.selectOne('widdy', 'diddy', 'fiddy'), ('6', 'H', 6))
+
+
+class DummyAsc(DummyExpr):
- def test_interface(self):
- from pypes.query import Join
- from pypes.interfaces import IPartialResult
- self.failUnless(verifyObject(
- IPartialResult,
- Join(self.inputs, DummyExpr(expr='a == b', names=['a', 'b']))))
-
- def test_join_handles_compares(self):
- from pypes.query import Join
- Join(self.inputs, DummyExpr(expr='a == b', names=['a', 'b']))
- Join(self.inputs, DummyExpr(expr='a < b', names=['a', 'b']))
- Join(self.inputs, DummyExpr(expr='a > b', names=['a', 'b']))
- Join(self.inputs, DummyExpr(expr='a in b', names=['a', 'b']))
- Join(self.inputs, DummyExpr(
- expr='a.__name__ == b["name"]', names=['a', 'b']))
-
- def test_join_cant_process(self):
- from pypes.query import Join
- from pypes.exceptions import CantProcess
- inputs = {'a':Set(), 'b':Set(), 'c':Set()}
- self.assertRaises(CantProcess, Join, inputs, DummyExpr(expr='True'))
- self.assertRaises(
- CantProcess, Join, inputs, DummyExpr(expr='a', names=['a']))
- self.assertRaises(
- CantProcess, Join, inputs, DummyExpr(expr='a == 1', names=['a']))
- self.assertRaises(
- CantProcess, Join, inputs, DummyExpr(expr='a == a', names=['a']))
- self.assertRaises(
- CantProcess, Join, inputs,
- DummyExpr(expr='a + b', names=['a', 'b']))
- self.assertRaises(
- CantProcess, Join, inputs,
- DummyExpr(expr='a + b > 0', names=['a', 'b']))
- self.assertRaises(
- CantProcess, Join, inputs,
- DummyExpr(expr='0 < a + b', names=['a', 'b']))
- self.assertRaises(
- CantProcess, Join, inputs,
- DummyExpr(expr='a * b == a + b', names=['a', 'b']))
- self.assertRaises(
- CantProcess, Join, inputs,
- DummyExpr(expr='a == b == c', names=['a', 'b', 'c']))
- self.assertRaises(
- CantProcess, Join, inputs,
- DummyExpr(expr='a >= b', names=['a', 'b']))
- self.assertRaises(
- CantProcess, Join, inputs,
- DummyExpr(expr='a <= b', names=['a', 'b']))
-
- def test_input_map(self):
- self.assertEqual(self.join.inputMap(), self.inputs)
- self.failIf(self.join.inputMap() is self.inputs)
+ implements(IOrderExpression)
+ order = 1
+
+
+class DummyDesc(DummyExpr):
- def test_criteria(self):
- self.failUnless(self.join.criteriaExpr() is self.criteria)
-
- def test_names_used(self):
- self.assertEqual(self.join.namesUsed(), Set(['a', 'b']))
-
- def test_select_joined_inputs(self):
- from pypes.query import Join
- j = Join(self.inputs, DummyExpr(expr='a[-1] == b', names=['a', 'b']))
- expected = sort([(b, a) for a in self.inputs['a']
- for b in self.inputs['b']
- if a[-1] == b])
- self.assertEqual(sort(j.iterResult('b', 'a')), expected)
- j = Join(self.inputs, DummyExpr(expr='b*2 > c', names=['b', 'c']))
- expected = sort([(b, c) for b in self.inputs['b']
- for c in self.inputs['c']
- if b*2 > c])
- self.assertEqual(sort(j.iterResult('b', 'c')), expected)
- j = Join(self.inputs, DummyExpr(expr='b in a', names=['a', 'b']))
- expected = sort([(a, b) for a in self.inputs['a']
- for b in self.inputs['b']
- if b in a])
- self.assertEqual(sort(j.iterResult('a', 'b')), expected)
-
- def test_select_inputs_not_joined(self):
- from pypes.query import Join
- j = Join(self.inputs, DummyExpr(expr='a[-1] == b', names=['a', 'b']))
- expected = Set([(a, c) for a in self.inputs['a']
- for b in self.inputs['b']
- for c in self.inputs['c']
- if a[-1] == b])
- self.assertEqual(Set(j.iterResult('a', 'c')), expected)
-
- def test_select_join_fixed_right(self):
- from pypes.query import Join
- j = Join(self.inputs, DummyExpr(expr='a[-1] == b', names=['a', 'b']))
- expected = Set([a for a in self.inputs['a']
- for b in self.inputs['b']
- if a[-1] == b and b == 'A'])
- self.assertEqual(Set(j.iterResult('a', b='A')), expected)
-
- def test_select_join_fixed_left(self):
- from pypes.query import Join
- j = Join(self.inputs, DummyExpr(expr='a[-1] == b', names=['a', 'b']))
- expected = Set([b for a in self.inputs['a']
- for b in self.inputs['b']
- if a[-1] == b and a == 'BAA'])
- self.assertEqual(Set(j.iterResult('b', a='BAA')), expected)
-
- def test_select_join_fixed_both(self):
- from pypes.query import Join
- j = Join(self.inputs, DummyExpr(expr='a[-1] == b', names=['a', 'b']))
- expected = Set([('AAD', 'D')])
- self.assertEqual(Set(j.iterResult('a', 'b', a='AAD', b='D')), expected)
-
- def test_select_join_fixed_nomatch(self):
- from pypes.query import Join
- j = Join(self.inputs, DummyExpr(expr='a[:2] == c', names=['a', 'c']))
- self.assertRaises(
- StopIteration, j.iterResult('a', 'c', a='AAA', c='CC').next)
- self.assertRaises(
- StopIteration, j.iterResult('a', 'c', a='BCC', c='CC').next)
- self.assertRaises(
- StopIteration, j.iterResult('a', 'c', a='AAA', c='EE').next)
- self.assertRaises(
- StopIteration, j.iterResult('a', 'c', a='REE', c='EE').next)
+ implements(IOrderExpression)
+ order = -1
+
+
+class TestQueryWithCriteria(TestQueryNoCriteria):
- def test_select_join_fixed_bad_input(self):
- from pypes.query import Join
- from pypes.exceptions import PypesQueryError
- j = Join(self.inputs, DummyExpr(expr='a[:2] == c', names=['a', 'c']))
- self.assertRaises(
- PypesQueryError, j.iterResult, 'a', 'c', uncle='bob')
-
- def test_union_joins_one_common_input(self):
- from pypes.query import Join
- j1 = Join(self.inputs, DummyExpr(expr='a[-1] == b', names=['a', 'b']))
- j2 = Join(self.inputs, DummyExpr(expr='a[:2] == c', names=['a', 'c']))
- union = j1.union(j2)
- self.failIf(union is j1)
- self.failIf(union is j2)
- expected = sort([(a, b, c) for a in self.inputs['a']
- for b in self.inputs['b']
- for c in self.inputs['c']
- if a[-1] == b or a[:2] == c])
- self.assertEqual(sort(union.iterResult('a', 'b', 'c')), expected)
-
- def test_left_result_set(self):
- from pypes.query import Join
- j = Join(self.inputs, DummyExpr(expr='a[-1] == b', names=['a', 'b']))
- expected = Set([a for a in self.inputs['a']
- for b in self.inputs['b']
- if a[-1] == b])
- self.assertEqual(j.resultSet('a'), expected)
-
- def test_right_result_set(self):
- from pypes.query import Join
- j = Join(self.inputs, DummyExpr(expr='a[-1] == b', names=['a', 'b']))
- expected = Set([b for a in self.inputs['a']
- for b in self.inputs['b']
- if a[-1] == b])
- self.assertEqual(j.resultSet('b'), expected)
-
- def test_result_set_not_in_criteria(self):
- from pypes.query import Join
- j = Join(self.inputs, DummyExpr(expr='a[-1] == b', names=['a', 'b']))
- self.assertEqual(j.resultSet('c'), self.inputs['c'])
-
- def test_empty_result_set(self):
- self.failIf(self.join.resultSet('a'))
- self.failIf(self.join.resultSet('b'))
- self.failIf(self.join.resultSet('c'))
+ criteria = DummyExpr(expr='int(widdy) == fiddy', names=('widdy', 'fiddy'))
+
+
+class TestQueryWithSimpleOrder(TestQueryNoCriteria):
- def test_result_set_bogus_input(self):
- from pypes.exceptions import PypesQueryError
- self.assertRaises(PypesQueryError, self.join.resultSet, 'cartouche')
-
+ order = DummyAsc(expr='widdy', names=('widdy',))
+
-class TestActualizedResult(unittest.TestCase):
+class TestQueryWithOrderAndLimit(TestQueryNoCriteria):
- def setUp(self):
- from pypes.query import ActualizedResult
- self.inputs = {'a': Set((10,20,30,40,50,60)),
- 'b': Set('ABCDEFGHIJKLMNOPQRSTUVWXYZ')}
- self.criteria = DummyExpr(expr="a%20 == 0 and b > 'M'", names=['a','b'])
- self.result = ActualizedResult(self.inputs, self.criteria)
+ order = DummyAsc(expr='fiddy', names=('fiddy',))
+ limit = 10
+
+
+class TestQueryWithMultipleOrder(TestQueryNoCriteria):
- def test_interface(self):
- from pypes.interfaces import IPartialResult
- self.failUnless(verifyObject(IPartialResult, self.result))
-
- def test_can_process(self):
- from pypes.query import ActualizedResult
- ActualizedResult(self.inputs, DummyExpr(expr="a == 10", names=['a']))
- ActualizedResult(self.inputs,
- DummyExpr(expr="a == 10 or a == 30", names=['a']))
- ActualizedResult(self.inputs,
- DummyExpr(expr="a == 10 and b in ('a','b')", names=['a', 'b']))
-
- def test_cant_process(self):
- from pypes.query import ActualizedResult
- from pypes.exceptions import CantProcess
- self.assertRaises(CantProcess, ActualizedResult,
- self.inputs, DummyExpr(expr="a == b", names=['a', 'b']))
- self.assertRaises(CantProcess, ActualizedResult,
- self.inputs, DummyExpr(expr="a.startswith(b)", names=['a', 'b']))
-
- def test_input_map(self):
- expected = {
- 'a': Set((20, 40, 60)),
- 'b': Set('NOPQRSTUVWXYZ')}
- self.assertEqual(self.result.inputMap(), expected)
- self.failIf(self.result.inputMap() is self.inputs)
+ order = (DummyDesc(expr='fiddy', names=('fiddy',)),
+ DummyAsc(expr='widdy', names=('widdy',)))
+
+
+class TestQueryOrderWithCriteria(TestQueryWithCriteria):
- def test_criteria(self):
- self.failUnless(self.result.criteriaExpr() is self.criteria)
-
- def test_names_used(self):
- self.assertEqual(self.result.namesUsed(), Set(['a', 'b']))
+ order = DummyDesc(expr='diddy', names=('diddy',))
+ criteria = DummyExpr(expr='int(widdy) == fiddy', names=('widdy', 'fiddy'))
+
+
+class TestQueryOrderWithCriteriaAndLimit(TestQueryWithCriteria):
- def test_no_criteria(self):
- from pypes.query import ActualizedResult
- inputset = Set((1,2,3,4))
- ar = ActualizedResult({'x':inputset})
- self.assertEqual(Set(ar.iterResult('x')), inputset)
-
- def test_select_one_input(self):
- expected = Set([a for a in self.inputs['a'] if a%20 == 0])
- self.assertEqual(Set(self.result.iterResult('a')), expected)
- expected = Set([b for b in self.inputs['b'] if b > 'M'])
- self.assertEqual(Set(self.result.iterResult('b')), expected)
-
- def test_select_multiple_inputs(self):
- expected = sort([(a, b) for a in self.inputs['a']
- for b in self.inputs['b']
- if a%20 == 0 and b > 'M'])
- self.assertEqual(sort(self.result.iterResult('a', 'b')), expected)
- expected = sort([(b, a) for a, b in expected])
- self.assertEqual(sort(self.result.iterResult('b', 'a')), expected)
-
- def test_empty_result(self):
- from pypes.query import ActualizedResult
- ar = ActualizedResult(
- self.inputs, DummyExpr(expr="a > 100", names=['a']))
- self.failIf(list(ar.iterResult('a', 'b')))
- self.failIf(list(ar.iterResult('b', 'a')))
- self.failIf(list(ar.iterResult('a')))
- self.failIf(list(ar.iterResult('b')))
+ order = (DummyAsc(expr='fiddy', names=('fiddy',)),
+ DummyAsc(expr='diddy', names=('diddy',)))
+ criteria = DummyExpr(expr='int(widdy) == fiddy', names=('widdy', 'fiddy'))
+ limit = 20
+
+
+class TestQueryOrderNoCriteria(QueryTestBase):
+
+ def _resultDicts(self):
+ return [{'widdy':w, 'fiddy':f, 'diddy':d}
+ for w in self.inputs['widdy']
+ for f in self.inputs['fiddy']
+ for d in self.inputs['diddy']
+ if self._testCriteria(widdy=w, fiddy=f, diddy=d)]
+
+ def test_asc_order_for_single_output(self):
+ for name, ipt in self.inputs.items():
+ q = self._makeQuery(
+ self.inputs, self.criteria, DummyAsc(name), self.limit)
+ expected = sort(map(lambda x: x[name], self._resultDicts()))
+ if self.limit is not None:
+ expected = expected[:limit]
+ self.assertEqual(list(q.select(name)), expected)
+
+ def test_desc_order_for_single_output(self):
+ for name, ipt in self.inputs.items():
+ q = self._makeQuery(
+ self.inputs, self.criteria, DummyDesc(name), self.limit)
+ expected = sort(map(lambda x: x[name], self._resultDicts()))
+ expected.reverse()
+ if self.limit is not None:
+ expected = expected[:limit]
+ self.assertEqual(list(q.select(name)), expected)
+
+ def _makeMultiSorted(self, OrderClass1, OrderClass2):
+ q = self._makeQuery(self.inputs, self.criteria,
+ (OrderClass1('fiddy', names='fiddy'),
+ OrderClass2('diddy', names='diddy')), self.limit)
+ all = self._resultDicts()
+ sortfunc = lambda x, y: (
+ cmp(x['fiddy'], y['fiddy']) or cmp(x['diddy'], y['diddy']))
+ all.sort(sortfunc)
+ if self.limit is not None:
+ all = all[:self.limit]
+ return q, all
+
+ def test_multiple_asc_single_out(self):
+ q, all = self._makeMultiSorted(DummyAsc, DummyAsc)
+ expected = [x['fiddy'] for x in all]
+ self.assertEqual(list(q.select('fiddy')), expected)
+ expected = [x['diddy'] for x in all]
+ self.assertEqual(list(q.select('diddy')), expected)
+
+ def test_multiple_asc_multiple_out(self):
+ q, all = self._makeMultiSorted(DummyAsc, DummyAsc)
+ expected = [(x['fiddy'], x['diddy']) for x in all]
+ self.assertEqual(list(q.select('fiddy', 'diddy')), expected)
+ expected = [(x['diddy'], x['fiddy']) for x in all]
+ self.assertEqual(list(q.select('diddy', 'fiddy')). expected)
+
+ def test_multiple_desc_single_out(self):
+ q, all = self._makeMultiSorted(DummyDesc, DummyDesc)
+ expected = [x['fiddy'] for x in all]
+ self.assertEqual(list(q.select('fiddy')), expected)
+ expected = [x['diddy'] for x in all]
+ self.assertEqual(list(q.select('diddy')), expected)
+
+ def test_multiple_desc_multiple_out(self):
+ q, all = self._makeMultiSorted(DummyDesc, DummyDesc)
+ expected = [(x['fiddy'], x['diddy']) for x in all]
+ self.assertEqual(list(q.select('fiddy', 'diddy')), expected)
+ expected = [(x['diddy'], x['fiddy']) for x in all]
+ self.assertEqual(list(q.select('diddy', 'fiddy')), expected)
+ self.assertEqual(list(q.select('diddy', 'fiddy')), expected)
+
+ def test_mixed_sort_single_out(self):
+ q, all = self._makeMultiSorted(DummyAsc, DummyDesc)
+ expected = [x['fiddy'] for x in all]
+ self.assertEqual(list(q.select('fiddy')), expected)
+ expected = [x['diddy'] for x in all]
+ self.assertEqual(list(q.select('diddy')), expected)
+
+ def test_mixed_sort_multiple_out(self):
+ q, all = self._makeMultiSorted(DummyAsc, DummyDesc)
+ expected = [(x['fiddy'], x['diddy']) for x in all]
+ self.assertEqual(list(q.select('fiddy', 'diddy')), expected)
+ expected = [(x['diddy'], x['fiddy']) for x in all]
+ self.assertEqual(list(q.select('diddy', 'fiddy')), expected)
+
+
+class TestQueryResult(unittest.TestCase):
- def test_select_fixed_one_input(self):
- self.assertEqual(list(self.result.iterResult('a', a=40)), [40])
- self.assertEqual(
- Set(self.result.iterResult('b', a=40)),
- self.result.inputMap()['b'])
+ def setUp(self):
+ from pypes.query import QueryResult
+ self.query = object()
+ self.inputs = {'jim':Set('ACEGIK'), 'beam':Set('abcdefg')}
+ self.partresult = DummyPartResult(
+ self.inputs,
+ DummyExpr(expr='jim.lower() == beam', names=['jim', 'beam']))
+ self.result = QueryResult(
+ self.query, ('jim', 'beam'), self.partresult)
- def test_select_fixed_two_inputs(self):
- self.assertEqual(
- list(self.result.iterResult('a', 'b', a=20, b='Q')), [(20, 'Q')])
+ def test_interface(self):
+ from pypes.interfaces import IQueryResult
+ self.failUnless(verifyObject(IQueryResult, self.result))
- def test_select_fixed_nomatch(self):
- self.assertRaises(
- StopIteration, self.result.iterResult('a', 'b', a=30).next)
- self.assertRaises(
- StopIteration, self.result.iterResult('a', 'b', b='E').next)
- self.assertRaises(
- StopIteration,
- self.result.iterResult('a', 'b', a=1000, b='Jeez').next)
+ def test_query_attr(self):
+ self.failUnless(self.result.query() is self.query)
- def test_select_fixed_bad_input(self):
- from pypes.exceptions import PypesQueryError
- self.assertRaises(
- PypesQueryError, self.result.iterResult('a', 'b', uncle='bob').next)
-
- def test_result_set_input_in_criteria(self):
- self.assertEqual(self.result.resultSet('a'), Set((20, 40, 60)))
- self.assertEqual(self.result.resultSet('b'), Set('NOPQRSTUVWXYZ'))
-
- def test_empty_result_set(self):
- from pypes.query import ActualizedResult
- ar = ActualizedResult(
- self.inputs, DummyExpr(expr="a > 100", names=['a']))
- self.failIf(ar.resultSet('a'))
- self.failIf(ar.resultSet('b'))
+ def test_names_attr(self):
+ self.assertEqual(sort(self.result.names()), ['beam', 'jim'])
- def test_result_set_bogus_input(self):
- from pypes.exceptions import PypesQueryError
- self.assertRaises(PypesQueryError, self.result.resultSet, 'cleo')
+ def test_evaluated_attr(self):
+ self.failIf(self.result.evaluated())
+
+ def test_iter(self):
+ expected = sort([(j, b) for j in self.inputs['jim']
+ for b in self.inputs['beam']
+ if j.lower() == b])
+ self.assertEqual(sort(iter(self.result)), expected)
+ return expected
+
+ def test_len(self):
+ expected = len(self.test_iter())
+ self.assertEqual(len(self.result), expected)
+ self.failUnless(self.result.evaluated())
+
if __name__ == '__main__':
unittest.main()
More information about the Zope-CVS
mailing list