[Zope3-checkins] CVS: Zope3/src/zodb/btrees/tests - __init__.py:1.2 test_btrees.py:1.2 test_btreesubclass.py:1.2 test_compare.py:1.2 test_conflict.py:1.2 test_setops.py:1.2 test_unicode.py:1.2
Jim Fulton
jim@zope.com
Wed, 25 Dec 2002 09:13:49 -0500
Update of /cvs-repository/Zope3/src/zodb/btrees/tests
In directory cvs.zope.org:/tmp/cvs-serv15352/src/zodb/btrees/tests
Added Files:
__init__.py test_btrees.py test_btreesubclass.py
test_compare.py test_conflict.py test_setops.py
test_unicode.py
Log Message:
Grand renaming:
- Renamed most files (especially python modules) to lower case.
- Moved views and interfaces into separate hierarchies within each
project, where each top-level directory under the zope package
is a separate project.
- Moved everything to src from lib/python.
lib/python will eventually go away. I need access to the cvs
repository to make this happen, however.
There are probably some bits that are broken. All tests pass
and zope runs, but I haven't tried everything. There are a number
of cleanups I'll work on tomorrow.
=== Zope3/src/zodb/btrees/tests/__init__.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:48 2002
+++ Zope3/src/zodb/btrees/tests/__init__.py Wed Dec 25 09:12:17 2002
@@ -0,0 +1,14 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+# If tests is a package, debugging is a bit easier.
=== Zope3/src/zodb/btrees/tests/test_btrees.py 1.1 => 1.2 === (1145/1245 lines abridged)
--- /dev/null Wed Dec 25 09:13:48 2002
+++ Zope3/src/zodb/btrees/tests/test_btrees.py Wed Dec 25 09:12:17 2002
@@ -0,0 +1,1242 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+from zodb.btrees.OOBTree import OOBTree, OOBucket, OOSet, OOTreeSet
+from zodb.btrees.IOBTree import IOBTree, IOBucket, IOSet, IOTreeSet
+from zodb.btrees.IIBTree import IIBTree, IIBucket, IISet, IITreeSet
+from zodb.btrees.OIBTree import OIBTree, OIBucket, OISet, OITreeSet
+
+from transaction import get_transaction
+
+from glob import glob
+import os
+import random
+from unittest import TestCase, TestSuite, TextTestRunner, makeSuite
+
+class Base(TestCase):
+ """ Tests common to all types: sets, buckets, and BTrees """
+ def tearDown(self):
+ self.t = None
+ del self.t
+
+ def _getRoot(self):
+ from zodb.storage.file import DB
+ n = 'fs_tmp__%s' % os.getpid()
+ db = DB(n, cache_size=1)
+ root = db.open().root()
+ return root
+
+ def _closeDB(self, root):
+ if root is not None:
+ root._p_jar._db.close()
+
+ def _delDB(self):
+ for file in glob('fs_tmp__*'):
+ os.remove(file)
+
+ def testLoadAndStore(self):
[-=- -=- -=- 1145 lines omitted -=- -=- -=-]
+ self.t = OIBTree()
+class OOBTreeTest(BTreeTests):
+ def setUp(self):
+ self.t = OOBTree()
+
+def test_suite():
+ s = TestSuite()
+
+ for klass in (IIBucketTest, IOBucketTest, OIBucketTest, OOBucketTest,
+ IITreeSetTest, IOTreeSetTest, OITreeSetTest, OOTreeSetTest,
+ IISetTest, IOSetTest, OISetTest, OOSetTest,
+ IIBTreeTest, IOBTreeTest, OIBTreeTest, OOBTreeTest,
+ TestIOBTrees, TestOIBTrees, TestIIBTrees, TestIOSets,
+ DegenerateBTree):
+ s.addTest(makeSuite(klass))
+
+ return s
+
+## utility functions
+
+def lsubtract(l1, l2):
+ l1 = list(l1)
+ l2 = list(l2)
+ l = filter(lambda x, l1=l1: x not in l1, l2)
+ l = l + filter(lambda x, l2=l2: x not in l2, l1)
+ return l
+
+def realseq(itemsob):
+ return [x for x in itemsob]
+
+def permutations(x):
+ # Return a list of all permutations of list x.
+ n = len(x)
+ if n <= 1:
+ return [x]
+ result = []
+ x0 = x[0]
+ for i in range(n):
+ # Build the (n-1)! permutations with x[i] in the first position.
+ xcopy = x[:]
+ first, xcopy[i] = xcopy[i], x0
+ result.extend([[first] + p for p in permutations(xcopy[1:])])
+ return result
+
+
+def main():
+ TextTestRunner().run(test_suite())
+
+if __name__ == '__main__':
+ main()
=== Zope3/src/zodb/btrees/tests/test_btreesubclass.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:48 2002
+++ Zope3/src/zodb/btrees/tests/test_btreesubclass.py Wed Dec 25 09:12:17 2002
@@ -0,0 +1,44 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+from zodb.btrees.OOBTree import OOBTree, OOBucket, OOSet, OOTreeSet
+
+class B(OOBucket):
+ pass
+
+class T(OOBTree):
+ _bucket_type = B
+
+import unittest
+
+class SubclassTest(unittest.TestCase):
+
+ def testSubclass(self):
+ # test that a subclass that defines _bucket_type gets buckets
+ # of that type
+ t = T()
+
+ # XXX there's no good way to get a bucket at the moment.
+ # XXX __getstate__() is as good as it gets, but the default
+ # XXX getstate explicitly includes the pickle of the bucket
+ # XXX for small trees, so we have to be clever :-(
+
+ # make sure there is more than one bucket in the tree
+ for i in range(1000):
+ t[i] = i
+
+ state = t.__getstate__()
+ self.assert_(state[0][0].__class__ is B)
+
+def test_suite():
+ return unittest.makeSuite(SubclassTest)
=== Zope3/src/zodb/btrees/tests/test_compare.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:48 2002
+++ Zope3/src/zodb/btrees/tests/test_compare.py Wed Dec 25 09:12:17 2002
@@ -0,0 +1,73 @@
+##############################################################################
+#
+# Copyright (c) 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Test errors during comparison of BTree keys."""
+
+import unittest
+
+import zodb.db
+from zodb.storage.mapping import MappingStorage
+
+from zodb.btrees.OOBTree import OOBucket as Bucket, OOSet as Set
+from transaction import get_transaction
+
+class CompareTest(unittest.TestCase):
+
+ s = "A string with hi-bit-set characters: \700\701"
+ u = u"A unicode string"
+
+ def setUp(self):
+ # These defaults only make sense if the default encoding
+ # prevents s from being promoted to Unicode.
+ self.assertRaises(UnicodeError, unicode, self.s)
+
+ # An object needs to be added to the database to
+ self.db = zodb.db.DB(MappingStorage())
+ root = self.db.open().root()
+ self.bucket = root["bucket"] = Bucket()
+ self.set = root["set"] = Set()
+ get_transaction().commit()
+
+ def tearDown(self):
+ self.assert_(self.bucket._p_changed != 2)
+ self.assert_(self.set._p_changed != 2)
+
+ def assertUE(self, callable, *args):
+ self.assertRaises(UnicodeError, callable, *args)
+
+ def testBucketGet(self):
+ self.bucket[self.s] = 1
+ self.assertUE(self.bucket.get, self.u)
+
+ def testSetGet(self):
+ self.set.insert(self.s)
+ self.assertUE(self.set.remove, self.u)
+
+ def testBucketSet(self):
+ self.bucket[self.s] = 1
+ self.assertUE(self.bucket.__setitem__, self.u, 1)
+
+ def testSetSet(self):
+ self.set.insert(self.s)
+ self.assertUE(self.set.insert, self.u)
+
+ def testBucketMinKey(self):
+ self.bucket[self.s] = 1
+ self.assertUE(self.bucket.minKey, self.u)
+
+ def testSetMinKey(self):
+ self.set.insert(self.s)
+ self.assertUE(self.set.minKey, self.u)
+
+def test_suite():
+ return unittest.makeSuite(CompareTest)
=== Zope3/src/zodb/btrees/tests/test_conflict.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:48 2002
+++ Zope3/src/zodb/btrees/tests/test_conflict.py Wed Dec 25 09:12:17 2002
@@ -0,0 +1,430 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+import os
+
+from zodb.btrees.OOBTree import OOBTree, OOBucket, OOSet, OOTreeSet
+from zodb.btrees.IOBTree import IOBTree, IOBucket, IOSet, IOTreeSet
+from zodb.btrees.IIBTree import IIBTree, IIBucket, IISet, IITreeSet
+from zodb.btrees.OIBTree import OIBTree, OIBucket, OISet, OITreeSet
+from unittest import TestCase, TestSuite, TextTestRunner, makeSuite
+
+from transaction.interfaces import ConflictError
+
+class Base:
+ """ Tests common to all types: sets, buckets, and BTrees """
+ def tearDown(self):
+ self.t = None
+ del self.t
+
+ def _getRoot(self):
+ from zodb.storage.file import FileStorage
+ from zodb.db import DB
+ n = 'fs_tmp__%s' % os.getpid()
+ s = FileStorage(n)
+ db = DB(s)
+ root = db.open().root()
+ return root
+
+ def _closeDB(self, root):
+ root._p_jar._db.close()
+ root = None
+
+ def _delDB(self):
+ os.system('rm fs_tmp__*')
+
+class MappingBase(Base):
+ """ Tests common to mappings (buckets, btrees) """
+
+ def _deletefail(self):
+ del self.t[1]
+
+ def _setupConflict(self):
+
+ l=[ -5124, -7377, 2274, 8801, -9901, 7327, 1565, 17, -679,
+ 3686, -3607, 14, 6419, -5637, 6040, -4556, -8622, 3847, 7191,
+ -4067]
+
+
+ e1=[(-1704, 0), (5420, 1), (-239, 2), (4024, 3), (-6984, 4)]
+ e2=[(7745, 0), (4868, 1), (-2548, 2), (-2711, 3), (-3154, 4)]
+
+
+ base=self.t
+ base.update([(i, i*i) for i in l[:20]])
+ b1=base.__class__(base)
+ b2=base.__class__(base)
+ bm=base.__class__(base)
+
+ items=base.items()
+
+ return base, b1, b2, bm, e1, e2, items
+
+ def testMergeDelete(self):
+ base, b1, b2, bm, e1, e2, items = self._setupConflict()
+ del b1[items[0][0]]
+ del b2[items[5][0]]
+ del b1[items[-1][0]]
+ del b2[items[-2][0]]
+ del bm[items[0][0]]
+ del bm[items[5][0]]
+ del bm[items[-1][0]]
+ del bm[items[-2][0]]
+ test_merge(base, b1, b2, bm, 'merge delete')
+
+ def testMergeDeleteAndUpdate(self):
+ base, b1, b2, bm, e1, e2, items = self._setupConflict()
+ del b1[items[0][0]]
+ b2[items[5][0]]=1
+ del b1[items[-1][0]]
+ b2[items[-2][0]]=2
+ del bm[items[0][0]]
+ bm[items[5][0]]=1
+ del bm[items[-1][0]]
+ bm[items[-2][0]]=2
+ test_merge(base, b1, b2, bm, 'merge update and delete')
+
+ def testMergeUpdate(self):
+ base, b1, b2, bm, e1, e2, items = self._setupConflict()
+ b1[items[0][0]]=1
+ b2[items[5][0]]=2
+ b1[items[-1][0]]=3
+ b2[items[-2][0]]=4
+ bm[items[0][0]]=1
+ bm[items[5][0]]=2
+ bm[items[-1][0]]=3
+ bm[items[-2][0]]=4
+ test_merge(base, b1, b2, bm, 'merge update')
+
+ def testFailMergeDelete(self):
+ base, b1, b2, bm, e1, e2, items = self._setupConflict()
+ del b1[items[0][0]]
+ del b2[items[0][0]]
+ test_merge(base, b1, b2, bm, 'merge conflicting delete',
+ should_fail=1)
+
+ def testFailMergeUpdate(self):
+ base, b1, b2, bm, e1, e2, items = self._setupConflict()
+ b1[items[0][0]]=1
+ b2[items[0][0]]=2
+ test_merge(base, b1, b2, bm, 'merge conflicting update',
+ should_fail=1)
+
+ def testFailMergeDeleteAndUpdate(self):
+ base, b1, b2, bm, e1, e2, items = self._setupConflict()
+ del b1[items[0][0]]
+ b2[items[0][0]]=-9
+ test_merge(base, b1, b2, bm, 'merge conflicting update and delete',
+ should_fail=1)
+
+ def testMergeInserts(self):
+ base, b1, b2, bm, e1, e2, items = self._setupConflict()
+
+ b1[-99999]=-99999
+ b1[e1[0][0]]=e1[0][1]
+ b2[99999]=99999
+ b2[e1[2][0]]=e1[2][1]
+
+ bm[-99999]=-99999
+ bm[e1[0][0]]=e1[0][1]
+ bm[99999]=99999
+ bm[e1[2][0]]=e1[2][1]
+ test_merge(base, b1, b2, bm, 'merge insert')
+
+ def testMergeInsertsFromEmpty(self):
+ base, b1, b2, bm, e1, e2, items = self._setupConflict()
+
+ base.clear()
+ b1.clear()
+ b2.clear()
+ bm.clear()
+
+ b1.update(e1)
+ bm.update(e1)
+ b2.update(e2)
+ bm.update(e2)
+
+ test_merge(base, b1, b2, bm, 'merge insert from empty')
+
+ def testMergeEmptyAndFill(self):
+ base, b1, b2, bm, e1, e2, items = self._setupConflict()
+
+ b1.clear()
+ bm.clear()
+ b2.update(e2)
+ bm.update(e2)
+
+ test_merge(base, b1, b2, bm, 'merge insert from empty')
+
+ def testMergeEmpty(self):
+ base, b1, b2, bm, e1, e2, items = self._setupConflict()
+
+ b1.clear()
+ bm.clear()
+
+ test_merge(base, b1, b2, bm, 'empty one and not other')
+
+ def testFailMergeInsert(self):
+ base, b1, b2, bm, e1, e2, items = self._setupConflict()
+ b1[-99999]=-99999
+ b1[e1[0][0]]=e1[0][1]
+ b2[99999]=99999
+ b2[e1[0][0]]=e1[0][1]
+ test_merge(base, b1, b2, bm, 'merge conflicting inserts',
+ should_fail=1)
+
+
+class NormalSetTests(Base):
+ """ Test common to all set types """
+
+
+
+class ExtendedSetTests(NormalSetTests):
+ "Set (as opposed to TreeSet) specific tests."
+
+ def _setupConflict(self):
+ l=[ -5124, -7377, 2274, 8801, -9901, 7327, 1565, 17, -679,
+ 3686, -3607, 14, 6419, -5637, 6040, -4556, -8622, 3847, 7191,
+ -4067]
+
+ e1=[-1704, 5420, -239, 4024, -6984]
+ e2=[7745, 4868, -2548, -2711, -3154]
+
+
+ base=self.t
+ base.update(l)
+ b1=base.__class__(base)
+ b2=base.__class__(base)
+ bm=base.__class__(base)
+
+ items=base.keys()
+
+ return base, b1, b2, bm, e1, e2, items
+
+ def testMergeDelete(self):
+ base, b1, b2, bm, e1, e2, items = self._setupConflict()
+ b1.remove(items[0])
+ b2.remove(items[5])
+ b1.remove(items[-1])
+ b2.remove(items[-2])
+ bm.remove(items[0])
+ bm.remove(items[5])
+ bm.remove(items[-1])
+ bm.remove(items[-2])
+ test_merge(base, b1, b2, bm, 'merge delete')
+
+ def testFailMergeDelete(self):
+ base, b1, b2, bm, e1, e2, items = self._setupConflict()
+ b1.remove(items[0])
+ b2.remove(items[0])
+ test_merge(base, b1, b2, bm, 'merge conflicting delete',
+ should_fail=1)
+
+ def testMergeInserts(self):
+ base, b1, b2, bm, e1, e2, items = self._setupConflict()
+
+ b1.insert(-99999)
+ b1.insert(e1[0])
+ b2.insert(99999)
+ b2.insert(e1[2])
+
+ bm.insert(-99999)
+ bm.insert(e1[0])
+ bm.insert(99999)
+ bm.insert(e1[2])
+ test_merge(base, b1, b2, bm, 'merge insert')
+
+ def testMergeInsertsFromEmpty(self):
+ base, b1, b2, bm, e1, e2, items = self._setupConflict()
+
+ base.clear()
+ b1.clear()
+ b2.clear()
+ bm.clear()
+
+ b1.update(e1)
+ bm.update(e1)
+ b2.update(e2)
+ bm.update(e2)
+
+ test_merge(base, b1, b2, bm, 'merge insert from empty')
+
+ def testMergeEmptyAndFill(self):
+ base, b1, b2, bm, e1, e2, items = self._setupConflict()
+
+ b1.clear()
+ bm.clear()
+ b2.update(e2)
+ bm.update(e2)
+
+ test_merge(base, b1, b2, bm, 'merge insert from empty')
+
+ def testMergeEmpty(self):
+ base, b1, b2, bm, e1, e2, items = self._setupConflict()
+
+ b1.clear()
+ bm.clear()
+
+ test_merge(base, b1, b2, bm, 'empty one and not other')
+
+ def testFailMergeInsert(self):
+ base, b1, b2, bm, e1, e2, items = self._setupConflict()
+ b1.insert(-99999)
+ b1.insert(e1[0])
+ b2.insert(99999)
+ b2.insert(e1[0])
+ test_merge(base, b1, b2, bm, 'merge conflicting inserts',
+ should_fail=1)
+
+
+def test_merge(o1, o2, o3, expect, message='failed to merge', should_fail=0):
+ s1=o1.__getstate__()
+ s2=o2.__getstate__()
+ s3=o3.__getstate__()
+ expected=expect.__getstate__()
+ if expected is None: expected=((((),),),)
+
+ if should_fail:
+ try:
+ merged=o1._p_resolveConflict(s1, s2, s3)
+ except (ConflictError, ValueError), err:
+ pass # ConflictError is the only exception that should occur
+ else:
+ assert 0, message
+ else:
+ merged=o1._p_resolveConflict(s1, s2, s3)
+ assert merged==expected, message
+
+class BucketTests(MappingBase):
+ """ Tests common to all buckets """
+
+
+class BTreeTests(MappingBase):
+ """ Tests common to all BTrees """
+
+## BTree tests
+
+class TestIOBTrees(BTreeTests, TestCase):
+ def setUp(self):
+ self.t = IOBTree()
+
+class TestOOBTrees(BTreeTests, TestCase):
+ def setUp(self):
+ self.t = OOBTree()
+
+class TestOIBTrees(BTreeTests, TestCase):
+ def setUp(self):
+ self.t = OIBTree()
+
+class TestIIBTrees(BTreeTests, TestCase):
+ def setUp(self):
+ self.t = IIBTree()
+
+## Set tests
+
+class TestIOSets(ExtendedSetTests, TestCase):
+ def setUp(self):
+ self.t = IOSet()
+
+class TestOOSets(ExtendedSetTests, TestCase):
+ def setUp(self):
+ self.t = OOSet()
+
+class TestIISets(ExtendedSetTests, TestCase):
+ def setUp(self):
+ self.t = IISet()
+
+class TestOISets(ExtendedSetTests, TestCase):
+ def setUp(self):
+ self.t = OISet()
+
+class TestIOTreeSets(NormalSetTests, TestCase):
+ def setUp(self):
+ self.t = IOTreeSet()
+
+class TestOOTreeSets(NormalSetTests, TestCase):
+ def setUp(self):
+ self.t = OOTreeSet()
+
+class TestIITreeSets(NormalSetTests, TestCase):
+ def setUp(self):
+ self.t = IITreeSet()
+
+class TestOITreeSets(NormalSetTests, TestCase):
+ def setUp(self):
+ self.t = OITreeSet()
+
+## Bucket tests
+
+class TestIOBuckets(BucketTests, TestCase):
+ def setUp(self):
+ self.t = IOBucket()
+
+class TestOOBuckets(BucketTests, TestCase):
+ def setUp(self):
+ self.t = OOBucket()
+
+class TestIIBuckets(BucketTests, TestCase):
+ def setUp(self):
+ self.t = IIBucket()
+
+class TestOIBuckets(BucketTests, TestCase):
+ def setUp(self):
+ self.t = OIBucket()
+
+# XXX disable tests for now
+def test_suite():
+ TIOBTree = makeSuite(TestIOBTrees, 'test')
+ TOOBTree = makeSuite(TestOOBTrees, 'test')
+ TOIBTree = makeSuite(TestOIBTrees, 'test')
+ TIIBTree = makeSuite(TestIIBTrees, 'test')
+
+ TIOSet = makeSuite(TestIOSets, 'test')
+ TOOSet = makeSuite(TestOOSets, 'test')
+ TOISet = makeSuite(TestIOSets, 'test')
+ TIISet = makeSuite(TestOOSets, 'test')
+
+ TIOTreeSet = makeSuite(TestIOTreeSets, 'test')
+ TOOTreeSet = makeSuite(TestOOTreeSets, 'test')
+ TOITreeSet = makeSuite(TestIOTreeSets, 'test')
+ TIITreeSet = makeSuite(TestOOTreeSets, 'test')
+
+ TIOBucket = makeSuite(TestIOBuckets, 'test')
+ TOOBucket = makeSuite(TestOOBuckets, 'test')
+ TOIBucket = makeSuite(TestOIBuckets, 'test')
+ TIIBucket = makeSuite(TestIIBuckets, 'test')
+
+ alltests = TestSuite((TIOSet, TOOSet, TOISet, TIISet,
+ TIOTreeSet, TOOTreeSet, TOITreeSet, TIITreeSet,
+ TIOBucket, TOOBucket, TOIBucket, TIIBucket,
+ TOOBTree, TIOBTree, TOIBTree, TIIBTree))
+
+ return alltests
+
+## utility functions
+
+def lsubtract(l1, l2):
+ l1=list(l1)
+ l2=list(l2)
+ l = filter(lambda x, l1=l1: x not in l1, l2)
+ l = l + filter(lambda x, l2=l2: x not in l2, l1)
+ return l
+
+def realseq(itemsob):
+ return map(lambda x: x, itemsob)
+
+def main():
+ TextTestRunner().run(test_suite())
+
+if __name__ == '__main__':
+ main()
=== Zope3/src/zodb/btrees/tests/test_setops.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:48 2002
+++ Zope3/src/zodb/btrees/tests/test_setops.py Wed Dec 25 09:12:17 2002
@@ -0,0 +1,486 @@
+##############################################################################
+#
+# Copyright (c) 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+import random
+from unittest import TestCase, TestSuite, TextTestRunner, makeSuite
+
+from zodb.btrees.OOBTree import OOBTree, OOBucket, OOSet, OOTreeSet
+from zodb.btrees.IOBTree import IOBTree, IOBucket, IOSet, IOTreeSet
+from zodb.btrees.IIBTree import IIBTree, IIBucket, IISet, IITreeSet
+from zodb.btrees.OIBTree import OIBTree, OIBucket, OISet, OITreeSet
+
+# Subclasses have to set up:
+# builders - functions to build inputs, taking an optional keys arg
+# intersection, union, difference - set to the type-correct versions
+class SetResult(TestCase):
+ def setUp(self):
+ self.Akeys = [1, 3, 5, 6 ]
+ self.Bkeys = [ 2, 3, 4, 6, 7]
+ self.As = [makeset(self.Akeys) for makeset in self.builders]
+ self.Bs = [makeset(self.Bkeys) for makeset in self.builders]
+ self.emptys = [makeset() for makeset in self.builders]
+
+ # Slow but obviously correct Python implementations of basic ops.
+ def _union(self, x, y):
+ result = list(x)
+ for e in y:
+ if e not in result:
+ result.append(e)
+ result.sort()
+ return result
+
+ def _intersection(self, x, y):
+ result = []
+ for e in x:
+ if e in y:
+ result.append(e)
+ return result
+
+ def _difference(self, x, y):
+ result = list(x)
+ for e in y:
+ if e in result:
+ result.remove(e)
+ # Difference preserves LHS values.
+ if hasattr(x, "values"):
+ result = [(k, x[k]) for k in result]
+ return result
+
+ def testNone(self):
+ for op in self.union, self.intersection, self.difference:
+ C = op(None, None)
+ self.assert_(C is None)
+
+ for op in self.union, self.intersection, self.difference:
+ for A in self.As:
+ C = op(A, None)
+ self.assert_(C is A)
+
+ C = op(None, A)
+ if op is self.difference:
+ self.assert_(C is None)
+ else:
+ self.assert_(C is A)
+
+ def testEmptyUnion(self):
+ for A in self.As:
+ for E in self.emptys:
+ C = self.union(A, E)
+ self.assert_(not hasattr(C, "values"))
+ self.assertEqual(list(C), self.Akeys)
+
+ C = self.union(E, A)
+ self.assert_(not hasattr(C, "values"))
+ self.assertEqual(list(C), self.Akeys)
+
+ def testEmptyIntersection(self):
+ for A in self.As:
+ for E in self.emptys:
+ C = self.intersection(A, E)
+ self.assert_(not hasattr(C, "values"))
+ self.assertEqual(list(C), [])
+
+ C = self.intersection(E, A)
+ self.assert_(not hasattr(C, "values"))
+ self.assertEqual(list(C), [])
+
+ def testEmptyDifference(self):
+ for A in self.As:
+ for E in self.emptys:
+ C = self.difference(A, E)
+ # Difference preserves LHS values.
+ self.assertEqual(hasattr(C, "values"), hasattr(A, "values"))
+ if hasattr(A, "values"):
+ self.assertEqual(list(C.items()), list(A.items()))
+ else:
+ self.assertEqual(list(C), self.Akeys)
+
+ C = self.difference(E, A)
+ self.assertEqual(hasattr(C, "values"), hasattr(E, "values"))
+ self.assertEqual(list(C), [])
+
+ def testUnion(self):
+ inputs = self.As + self.Bs
+ for A in inputs:
+ for B in inputs:
+ C = self.union(A, B)
+ self.assert_(not hasattr(C, "values"))
+ self.assertEqual(list(C), self._union(A, B))
+
+ def testIntersection(self):
+ inputs = self.As + self.Bs
+ for A in inputs:
+ for B in inputs:
+ C = self.intersection(A, B)
+ self.assert_(not hasattr(C, "values"))
+ self.assertEqual(list(C), self._intersection(A, B))
+
+ def testDifference(self):
+ inputs = self.As + self.Bs
+ for A in inputs:
+ for B in inputs:
+ C = self.difference(A, B)
+ # Difference preserves LHS values.
+ self.assertEqual(hasattr(C, "values"), hasattr(A, "values"))
+ want = self._difference(A, B)
+ if hasattr(A, "values"):
+ self.assertEqual(list(C.items()), want)
+ else:
+ self.assertEqual(list(C), want)
+
+ def testLargerInputs(self):
+ from random import randint
+ MAXSIZE = 200
+ MAXVAL = 400
+ for i in range(3):
+ n = randint(0, MAXSIZE)
+ Akeys = [randint(1, MAXVAL) for j in range(n)]
+ As = [makeset(Akeys) for makeset in self.builders]
+ Akeys = IISet(Akeys)
+
+ n = randint(0, MAXSIZE)
+ Bkeys = [randint(1, MAXVAL) for j in range(n)]
+ Bs = [makeset(Bkeys) for makeset in self.builders]
+ Bkeys = IISet(Bkeys)
+
+ for op, simulator in ((self.union, self._union),
+ (self.intersection, self._intersection),
+ (self.difference, self._difference)):
+ for A in As:
+ for B in Bs:
+ got = op(A, B)
+ want = simulator(Akeys, Bkeys)
+ self.assertEqual(list(got), want,
+ (A, B, Akeys, Bkeys, list(got), want))
+
+# Given a mapping builder (IIBTree, OOBucket, etc), return a function
+# that builds an object of that type given only a list of keys.
+def makeBuilder(mapbuilder):
+ def result(keys=[], mapbuilder=mapbuilder):
+ return mapbuilder(zip(keys, keys))
+ return result
+
+class PureII(SetResult):
+ from zodb.btrees.IIBTree import union, intersection, difference
+ builders = IISet, IITreeSet, makeBuilder(IIBTree), makeBuilder(IIBucket)
+
+class PureIO(SetResult):
+ from zodb.btrees.IOBTree import union, intersection, difference
+ builders = IOSet, IOTreeSet, makeBuilder(IOBTree), makeBuilder(IOBucket)
+
+class PureOO(SetResult):
+ from zodb.btrees.OOBTree import union, intersection, difference
+ builders = OOSet, OOTreeSet, makeBuilder(OOBTree), makeBuilder(OOBucket)
+
+class PureOI(SetResult):
+ from zodb.btrees.OIBTree import union, intersection, difference
+ builders = OISet, OITreeSet, makeBuilder(OIBTree), makeBuilder(OIBucket)
+
+# Subclasses must set up (as class variables):
+# multiunion, union
+# mkset, mktreeset
+# mkbucket, mkbtree
+class MultiUnion(TestCase):
+
+ def testEmpty(self):
+ self.assertEqual(len(self.multiunion([])), 0)
+
+ def testOne(self):
+ for sequence in [3], range(20), range(-10, 0, 2) + range(1, 10, 2):
+ seq1 = sequence[:]
+ seq2 = sequence[:]
+ seq2.reverse()
+ seqsorted = sequence[:]
+ seqsorted.sort()
+ for seq in seq1, seq2, seqsorted:
+ for builder in self.mkset, self.mktreeset:
+ input = builder(seq)
+ output = self.multiunion([input])
+ self.assertEqual(len(seq), len(output))
+ self.assertEqual(seqsorted, list(output))
+
+ def testValuesIgnored(self):
+ for builder in self.mkbucket, self.mkbtree:
+ input = builder([(1, 2), (3, 4), (5, 6)])
+ output = self.multiunion([input])
+ self.assertEqual([1, 3, 5], list(output))
+
+ def testBigInput(self):
+ N = 100000
+ input = self.mkset(range(N))
+ output = self.multiunion([input] * 10)
+ self.assertEqual(len(output), N)
+ self.assertEqual(output.minKey(), 0)
+ self.assertEqual(output.maxKey(), N-1)
+ self.assertEqual(list(output), range(N))
+
+ def testLotsOfLittleOnes(self):
+ from random import shuffle
+ N = 5000
+ inputs = []
+ mkset, mktreeset = self.mkset, self.mktreeset
+ for i in range(N):
+ base = i * 4 - N
+ inputs.append(mkset([base, base+1]))
+ inputs.append(mktreeset([base+2, base+3]))
+ shuffle(inputs)
+ output = self.multiunion(inputs)
+ self.assertEqual(len(output), N*4)
+ self.assertEqual(list(output), range(-N, 3*N))
+
+ def testFunkyKeyIteration(self):
+ # The internal set iteration protocol allows "iterating over" a
+ # a single key as if it were a set.
+ N = 100
+ union, mkset = self.union, self.mkset
+ slow = mkset()
+ for i in range(N):
+ slow = union(slow, mkset([i]))
+ fast = self.multiunion(range(N)) # acts like N distinct singleton sets
+ self.assertEqual(len(slow), N)
+ self.assertEqual(len(fast), N)
+ self.assertEqual(list(slow), list(fast))
+ self.assertEqual(list(fast), range(N))
+
+class TestIIMultiUnion(MultiUnion):
+ from zodb.btrees.IIBTree import multiunion, union
+ from zodb.btrees.IIBTree import IISet as mkset, IITreeSet as mktreeset
+ from zodb.btrees.IIBTree import IIBucket as mkbucket, IIBTree as mkbtree
+
+class TestIOMultiUnion(MultiUnion):
+ from zodb.btrees.IOBTree import multiunion, union
+ from zodb.btrees.IOBTree import IOSet as mkset, IOTreeSet as mktreeset
+ from zodb.btrees.IOBTree import IOBucket as mkbucket, IOBTree as mkbtree
+
+# Check that various special module functions are and aren't imported from
+# the expected BTree modules.
+class TestImports(TestCase):
+ def testWeightedUnion(self):
+ from zodb.btrees.IIBTree import weightedUnion
+ from zodb.btrees.OIBTree import weightedUnion
+
+ try:
+ from zodb.btrees.IOBTree import weightedUnion
+ except ImportError:
+ pass
+ else:
+ self.fail("IOBTree shouldn't have weightedUnion")
+
+ try:
+ from zodb.btrees.OOBTree import weightedUnion
+ except ImportError:
+ pass
+ else:
+ self.fail("OOBTree shouldn't have weightedUnion")
+
+ def testWeightedIntersection(self):
+ from zodb.btrees.IIBTree import weightedIntersection
+ from zodb.btrees.OIBTree import weightedIntersection
+
+ try:
+ from zodb.btrees.IOBTree import weightedIntersection
+ except ImportError:
+ pass
+ else:
+ self.fail("IOBTree shouldn't have weightedIntersection")
+
+ try:
+ from zodb.btrees.OOBTree import weightedIntersection
+ except ImportError:
+ pass
+ else:
+ self.fail("OOBTree shouldn't have weightedIntersection")
+
+ def testMultiunion(self):
+ from zodb.btrees.IIBTree import multiunion
+ from zodb.btrees.IOBTree import multiunion
+
+ try:
+ from zodb.btrees.OIBTree import multiunion
+ except ImportError:
+ pass
+ else:
+ self.fail("OIBTree shouldn't have multiunion")
+
+ try:
+ from zodb.btrees.OOBTree import multiunion
+ except ImportError:
+ pass
+ else:
+ self.fail("OOBTree shouldn't have multiunion")
+
+# Subclasses must set up (as class variables):
+# weightedUnion, weightedIntersection
+# builders -- sequence of constructors, taking items
+# union, intersection -- the module routines of those names
+# mkbucket -- the module bucket builder
+class Weighted(TestCase):
+
+ def setUp(self):
+ self.Aitems = [(1, 10), (3, 30), (5, 50), (6, 60)]
+ self.Bitems = [(2, 21), (3, 31), (4, 41), (6, 61), (7, 71)]
+
+ self.As = [make(self.Aitems) for make in self.builders]
+ self.Bs = [make(self.Bitems) for make in self.builders]
+ self.emptys = [make([]) for make in self.builders]
+
+ weights = []
+ for w1 in -3, -1, 0, 1, 7:
+ for w2 in -3, -1, 0, 1, 7:
+ weights.append((w1, w2))
+ self.weights = weights
+
+ def testBothNone(self):
+ for op in self.weightedUnion, self.weightedIntersection:
+ w, C = op(None, None)
+ self.assert_(C is None)
+ self.assertEqual(w, 0)
+
+ w, C = op(None, None, 42, 666)
+ self.assert_(C is None)
+ self.assertEqual(w, 0)
+
+ def testLeftNone(self):
+ for op in self.weightedUnion, self.weightedIntersection:
+ for A in self.As + self.emptys:
+ w, C = op(None, A)
+ self.assert_(C is A)
+ self.assertEqual(w, 1)
+
+ w, C = op(None, A, 42, 666)
+ self.assert_(C is A)
+ self.assertEqual(w, 666)
+
+ def testRightNone(self):
+ for op in self.weightedUnion, self.weightedIntersection:
+ for A in self.As + self.emptys:
+ w, C = op(A, None)
+ self.assert_(C is A)
+ self.assertEqual(w, 1)
+
+ w, C = op(A, None, 42, 666)
+ self.assert_(C is A)
+ self.assertEqual(w, 42)
+
+ # If obj is a set, return a bucket with values all 1; else return obj.
+ def _normalize(self, obj):
+ if isaset(obj):
+ obj = self.mkbucket(zip(obj, [1] * len(obj)))
+ return obj
+
+ # Python simulation of weightedUnion.
+ def _wunion(self, A, B, w1=1, w2=1):
+ if isaset(A) and isaset(B):
+ return 1, self.union(A, B).keys()
+ A = self._normalize(A)
+ B = self._normalize(B)
+ result = []
+ for key in self.union(A, B):
+ v1 = A.get(key, 0)
+ v2 = B.get(key, 0)
+ result.append((key, v1*w1 + v2*w2))
+ return 1, result
+
+ def testUnion(self):
+ inputs = self.As + self.Bs + self.emptys
+ for A in inputs:
+ for B in inputs:
+ want_w, want_s = self._wunion(A, B)
+ got_w, got_s = self.weightedUnion(A, B)
+ self.assertEqual(got_w, want_w)
+ if isaset(got_s):
+ self.assertEqual(got_s.keys(), want_s)
+ else:
+ self.assertEqual(got_s.items(), want_s)
+
+ for w1, w2 in self.weights:
+ want_w, want_s = self._wunion(A, B, w1, w2)
+ got_w, got_s = self.weightedUnion(A, B, w1, w2)
+ self.assertEqual(got_w, want_w)
+ if isaset(got_s):
+ self.assertEqual(got_s.keys(), want_s)
+ else:
+ self.assertEqual(got_s.items(), want_s)
+
+ # Python simulation weightedIntersection.
+ def _wintersection(self, A, B, w1=1, w2=1):
+ if isaset(A) and isaset(B):
+ return w1 + w2, self.intersection(A, B).keys()
+ A = self._normalize(A)
+ B = self._normalize(B)
+ result = []
+ for key in self.intersection(A, B):
+ result.append((key, A[key]*w1 + B[key]*w2))
+ return 1, result
+
+ def testIntersection(self):
+ inputs = self.As + self.Bs + self.emptys
+ for A in inputs:
+ for B in inputs:
+ want_w, want_s = self._wintersection(A, B)
+ got_w, got_s = self.weightedIntersection(A, B)
+ self.assertEqual(got_w, want_w)
+ if isaset(got_s):
+ self.assertEqual(got_s.keys(), want_s)
+ else:
+ self.assertEqual(got_s.items(), want_s)
+
+ for w1, w2 in self.weights:
+ want_w, want_s = self._wintersection(A, B, w1, w2)
+ got_w, got_s = self.weightedIntersection(A, B, w1, w2)
+ self.assertEqual(got_w, want_w)
+ if isaset(got_s):
+ self.assertEqual(got_s.keys(), want_s)
+ else:
+ self.assertEqual(got_s.items(), want_s)
+
+# Given a set builder (like OITreeSet or OISet), return a function that
+# takes a list of (key, value) pairs and builds a set out of the keys.
+def itemsToSet(setbuilder):
+ def result(items, setbuilder=setbuilder):
+ return setbuilder([key for key, value in items])
+ return result
+
+class TestWeightedII(Weighted):
+ from zodb.btrees.IIBTree import weightedUnion, weightedIntersection
+ from zodb.btrees.IIBTree import union, intersection
+ from zodb.btrees.IIBTree import IIBucket as mkbucket
+ builders = IIBucket, IIBTree, itemsToSet(IISet), itemsToSet(IITreeSet)
+
+class TestWeightedOI(Weighted):
+ from zodb.btrees.OIBTree import weightedUnion, weightedIntersection
+ from zodb.btrees.OIBTree import union, intersection
+ from zodb.btrees.OIBTree import OIBucket as mkbucket
+ builders = OIBucket, OIBTree, itemsToSet(OISet), itemsToSet(OITreeSet)
+
+
+# 'thing' is a bucket, btree, set or treeset. Return true iff it's one of the
+# latter two.
+def isaset(thing):
+ return not hasattr(thing, 'values')
+
+
+def test_suite():
+ s = TestSuite()
+ for klass in (TestIIMultiUnion, TestIOMultiUnion,
+ TestImports,
+ PureII, PureIO, PureOI, PureOO,
+ TestWeightedII, TestWeightedOI):
+ s.addTest(makeSuite(klass))
+ return s
+
+def main():
+ TextTestRunner().run(test_suite())
+
+if __name__ == '__main__':
+ main()
=== Zope3/src/zodb/btrees/tests/test_unicode.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:49 2002
+++ Zope3/src/zodb/btrees/tests/test_unicode.py Wed Dec 25 09:12:17 2002
@@ -0,0 +1,78 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+__version__ = '$Id$'
+
+import types
+import unittest
+from zodb.btrees.OOBTree import OOBTree
+
+# When an OOBtree contains unicode strings as keys,
+# it is neccessary accessing non-unicode strings are
+# either ascii strings or encoded as unicoded using the
+# corresponding encoding
+
+encoding = 'ISO-8859-1'
+
+class TestBTreesUnicode(unittest.TestCase):
+ """ test unicode"""
+
+ def setUp(self):
+ """setup an OOBTree with some unicode strings"""
+
+ self.s = unicode('dreit\xe4gigen', 'latin1')
+
+ self.data = [('alien', 1),
+ ('k\xf6nnten', 2),
+ ('fox', 3),
+ ('future', 4),
+ ('quick', 5),
+ ('zerst\xf6rt', 6),
+ (unicode('dreit\xe4gigen','latin1'), 7),
+ ]
+
+ self.tree = OOBTree()
+ for k, v in self.data:
+ if isinstance(k, types.StringType):
+ k = unicode(k, 'latin1')
+ self.tree[k] = v
+
+ def testAllKeys(self):
+ # check every item of the tree
+ for k, v in self.data:
+ if isinstance(k, types.StringType):
+ k = unicode(k, encoding)
+ self.assert_(self.tree.has_key(k))
+ self.assertEqual(self.tree[k], v)
+
+ def testUnicodeKeys(self):
+ # try to access unicode keys in tree
+ k, v = self.data[-1]
+ self.assertEqual(k, self.s)
+ self.assertEqual(self.tree[k], v)
+ self.assertEqual(self.tree[self.s], v)
+
+ def testAsciiKeys(self):
+ # try to access some "plain ASCII" keys in the tree
+ for k, v in self.data[0], self.data[2]:
+ self.assert_(isinstance(k, types.StringType))
+ self.assertEqual(self.tree[k], v)
+
+def test_suite():
+ return unittest.makeSuite(TestBTreesUnicode)
+
+def main():
+ unittest.TextTestRunner().run(test_suite())
+
+if __name__ == '__main__':
+ main()