[Zodb-checkins] CVS: Zope3/lib/python/Persistence/BTrees/tests - __init__.py:1.1.2.1 testBTrees.py:1.1.2.1 testBTreesUnicode.py:1.1.2.1 testConflict.py:1.1.2.1
Jeremy Hylton
jeremy@zope.com
Thu, 21 Feb 2002 17:53:30 -0500
Update of /cvs-repository/Zope3/lib/python/Persistence/BTrees/tests
In directory cvs.zope.org:/tmp/cvs-serv4857/tests
Added Files:
Tag: Zope-3x-branch
__init__.py testBTrees.py testBTreesUnicode.py testConflict.py
Log Message:
Initial checkin of BTrees.
Conflict resolution is broken, but other tests pass. So the tests in
testConflict are disabled for now.
=== Added File Zope3/lib/python/Persistence/BTrees/tests/__init__.py ===
# If tests is a package, debugging is a bit easier.
=== Added File Zope3/lib/python/Persistence/BTrees/tests/testBTrees.py === (677/777 lines abridged)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import os
import random
from Persistence.BTrees.OOBTree import OOBTree, OOBucket, OOSet, OOTreeSet
from Persistence.BTrees.IOBTree import IOBTree, IOBucket, IOSet, IOTreeSet
from Persistence.BTrees.IIBTree import IIBTree, IIBucket, IISet, IITreeSet
from Persistence.BTrees.OIBTree import OIBTree, OIBucket, OISet, OITreeSet
from unittest import TestCase, TestSuite, TextTestRunner, makeSuite
from glob import glob
class Base:
""" Tests common to all types: sets, buckets, and BTrees """
def tearDown(self):
self.t = None
del self.t
def _getRoot(self):
from ZODB.FileStorage import FileStorage
from ZODB.DB import DB
n = 'fs_tmp__%s' % os.getpid()
s = FileStorage(n)
db = DB(s)
root = db.open().root()
return root
def _closeDB(self, root):
if root is not None:
root._p_jar._db.close()
def _delDB(self):
for file in glob('fs_tmp__*'):
os.remove(file)
def testLoadAndStore(self):
for i in 0, 10, 1000:
t = self.t.__class__()
[-=- -=- -=- 677 lines omitted -=- -=- -=-]
def test_suite():
TIOBTree = makeSuite(TestIOBTrees, 'test')
TOOBTree = makeSuite(TestOOBTrees, 'test')
TOIBTree = makeSuite(TestOIBTrees, 'test')
TIIBTree = makeSuite(TestIIBTrees, 'test')
TIOSet = makeSuite(TestIOSets, 'test')
TOOSet = makeSuite(TestOOSets, 'test')
TOISet = makeSuite(TestIOSets, 'test')
TIISet = makeSuite(TestOOSets, 'test')
TIOTreeSet = makeSuite(TestIOTreeSets, 'test')
TOOTreeSet = makeSuite(TestOOTreeSets, 'test')
TOITreeSet = makeSuite(TestIOTreeSets, 'test')
TIITreeSet = makeSuite(TestOOTreeSets, 'test')
TIOBucket = makeSuite(TestIOBuckets, 'test')
TOOBucket = makeSuite(TestOOBuckets, 'test')
TOIBucket = makeSuite(TestOIBuckets, 'test')
TIIBucket = makeSuite(TestIIBuckets, 'test')
alltests = TestSuite((TIOSet, TOOSet, TOISet, TIISet,
TIOTreeSet, TOOTreeSet, TOITreeSet, TIITreeSet,
TIOBucket, TOOBucket, TOIBucket, TIIBucket,
TOOBTree, TIOBTree, TOIBTree, TIIBTree))
return alltests
## utility functions
def lsubtract(l1, l2):
l1=list(l1)
l2=list(l2)
l = filter(lambda x, l1=l1: x not in l1, l2)
l = l + filter(lambda x, l2=l2: x not in l2, l1)
return l
def realseq(itemsob):
return map(lambda x: x, itemsob)
def main():
TextTestRunner().run(test_suite())
if __name__ == '__main__':
main()
=== Added File Zope3/lib/python/Persistence/BTrees/tests/testBTreesUnicode.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
__version__ = '$Id: testBTreesUnicode.py,v 1.1.2.1 2002/02/21 22:53:29 jeremy Exp $'
import types
import unittest
from Persistence.BTrees.OOBTree import OOBTree
# When a OOBtree contains unicode strings as keys,
# it is neccessary accessing non-unicode strings are
# either ascii strings or encoded as unicoded using the
# corresponding encoding
encoding = 'ISO-8859-1'
class TestBTreesUnicode(unittest.TestCase):
""" test unicode"""
def setUp(self):
""" setup an OOBTree with some unicode strings """
self.s = unicode('dreit\xe4gigen','latin1')
self.data = [('alien', 284708388),
('k\xf6nnten', 284708389),
('fox', 284708387),
('future', 284708388),
('quick', 284708387),
('zerst\xf6rt', 284708389),
(unicode('dreit\xe4gigen','latin1'), 284708391)
]
self.tree = OOBTree()
for k, v in self.data:
if isinstance(k, types.StringType):
self.tree[unicode(k,'latin1')] = v
else:
self.tree[k] = v
def test1(self):
""" check every item of the tree """
for k, v in self.data:
if isinstance(k, types.StringType):
key = unicode(k, encoding)
else:
key = k
if self.tree[key] != v:
print "fehler"
def test2(self):
""" try to access unicode keys in tree"""
assert self.data[-1][0] == self.s
assert self.tree[self.data[-1][0]] == self.data[-1][1]
assert self.tree[self.s] == self.data[-1][1],''
def test_suite():
return unittest.makeSuite(TestBTreesUnicode)
def main():
unittest.TextTestRunner().run(test_suite())
if __name__ == '__main__':
main()
=== Added File Zope3/lib/python/Persistence/BTrees/tests/testConflict.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import os
import random
from Persistence.BTrees.OOBTree import OOBTree, OOBucket, OOSet, OOTreeSet
from Persistence.BTrees.IOBTree import IOBTree, IOBucket, IOSet, IOTreeSet
from Persistence.BTrees.IIBTree import IIBTree, IIBucket, IISet, IITreeSet
from Persistence.BTrees.OIBTree import OIBTree, OIBucket, OISet, OITreeSet
from unittest import TestCase, TestSuite, TextTestRunner, makeSuite
from ZODB.POSException import ConflictError
class Base:
""" Tests common to all types: sets, buckets, and BTrees """
def tearDown(self):
self.t = None
del self.t
def _getRoot(self):
from ZODB.FileStorage import FileStorage
from ZODB.DB import DB
n = 'fs_tmp__%s' % os.getpid()
s = FileStorage(n)
db = DB(s)
root = db.open().root()
return root
def _closeDB(self, root):
root._p_jar._db.close()
root = None
def _delDB(self):
os.system('rm fs_tmp__*')
class MappingBase(Base):
""" Tests common to mappings (buckets, btrees) """
def _deletefail(self):
del self.t[1]
def _setupConflict(self):
base=self.t
d={}
for i in range(20):
d[random.randrange(-10000, 10001)]=i*100000
e1={}
while len(e1) < 5:
k=random.randrange(-10000, 10001)
if not d.has_key(k):
e1[k]=len(e1)
e1=e1.items()
e2={}
while len(e2) < 5:
k=random.randrange(-10000, 10001)
if not d.has_key(k) and not e2.has_key(k):
e2[k]=len(e2)
e2=e2.items()
base.update(d)
b1=base.__class__(base)
b2=base.__class__(base)
bm=base.__class__(base)
items=base.items()
return base, b1, b2, bm, e1, e2, items
def testMergeDelete(self):
base, b1, b2, bm, e1, e2, items = self._setupConflict()
del b1[items[0][0]]
del b2[items[5][0]]
del b1[items[-1][0]]
del b2[items[-2][0]]
del bm[items[0][0]]
del bm[items[5][0]]
del bm[items[-1][0]]
del bm[items[-2][0]]
test_merge(base, b1, b2, bm, 'merge delete')
def testMergeDeleteAndUpdate(self):
base, b1, b2, bm, e1, e2, items = self._setupConflict()
del b1[items[0][0]]
b2[items[5][0]]=1
del b1[items[-1][0]]
b2[items[-2][0]]=2
del bm[items[0][0]]
bm[items[5][0]]=1
del bm[items[-1][0]]
bm[items[-2][0]]=2
test_merge(base, b1, b2, bm, 'merge update and delete')
def testMergeUpdate(self):
base, b1, b2, bm, e1, e2, items = self._setupConflict()
b1[items[0][0]]=1
b2[items[5][0]]=2
b1[items[-1][0]]=3
b2[items[-2][0]]=4
bm[items[0][0]]=1
bm[items[5][0]]=2
bm[items[-1][0]]=3
bm[items[-2][0]]=4
test_merge(base, b1, b2, bm, 'merge update')
def testFailMergeDelete(self):
base, b1, b2, bm, e1, e2, items = self._setupConflict()
del b1[items[0][0]]
del b2[items[0][0]]
test_merge(base, b1, b2, bm, 'merge conflicting delete',
should_fail=1)
def testFailMergeUpdate(self):
base, b1, b2, bm, e1, e2, items = self._setupConflict()
b1[items[0][0]]=1
b2[items[0][0]]=2
test_merge(base, b1, b2, bm, 'merge conflicting update',
should_fail=1)
def testFailMergeDeleteAndUpdate(self):
base, b1, b2, bm, e1, e2, items = self._setupConflict()
del b1[items[0][0]]
b2[items[0][0]]=-9
test_merge(base, b1, b2, bm, 'merge conflicting update and delete',
should_fail=1)
def testMergeInserts(self):
base, b1, b2, bm, e1, e2, items = self._setupConflict()
b1[-99999]=-99999
b1[e1[0][0]]=e1[0][1]
b2[99999]=99999
b2[e1[2][0]]=e1[2][1]
bm[-99999]=-99999
bm[e1[0][0]]=e1[0][1]
bm[99999]=99999
bm[e1[2][0]]=e1[2][1]
test_merge(base, b1, b2, bm, 'merge insert')
def testMergeInsertsFromEmpty(self):
base, b1, b2, bm, e1, e2, items = self._setupConflict()
base.clear()
b1.clear()
b2.clear()
bm.clear()
b1.update(e1)
bm.update(e1)
b2.update(e2)
bm.update(e2)
test_merge(base, b1, b2, bm, 'merge insert from empty')
def testMergeEmptyAndFill(self):
base, b1, b2, bm, e1, e2, items = self._setupConflict()
b1.clear()
bm.clear()
b2.update(e2)
bm.update(e2)
test_merge(base, b1, b2, bm, 'merge insert from empty')
def testMergeEmpty(self):
base, b1, b2, bm, e1, e2, items = self._setupConflict()
b1.clear()
bm.clear()
test_merge(base, b1, b2, bm, 'empty one and not other')
def testFailMergeInsert(self):
base, b1, b2, bm, e1, e2, items = self._setupConflict()
b1[-99999]=-99999
b1[e1[0][0]]=e1[0][1]
b2[99999]=99999
b2[e1[0][0]]=e1[0][1]
test_merge(base, b1, b2, bm, 'merge conflicting inserts',
should_fail=1)
class NormalSetTests(Base):
""" Test common to all set types """
class ExtendedSetTests(NormalSetTests):
"Set (as opposed to TreeSet) specific tests."
def _setupConflict(self):
base=self.t
d={}
for i in range(20):
d[random.randrange(-10000, 10001)]=i*100000
e1={}
while len(e1) < 5:
k=random.randrange(-10000, 10001)
if not d.has_key(k):
e1[k]=len(e1)
e1=e1.keys()
e2={}
while len(e2) < 5:
k=random.randrange(-10000, 10001)
if not d.has_key(k) and not e2.has_key(k):
e2[k]=len(e2)
e2=e2.keys()
base.update(d.keys())
b1=base.__class__(base)
b2=base.__class__(base)
bm=base.__class__(base)
items=base.keys()
return base, b1, b2, bm, e1, e2, items
def testMergeDelete(self):
base, b1, b2, bm, e1, e2, items = self._setupConflict()
b1.remove(items[0])
b2.remove(items[5])
b1.remove(items[-1])
b2.remove(items[-2])
bm.remove(items[0])
bm.remove(items[5])
bm.remove(items[-1])
bm.remove(items[-2])
test_merge(base, b1, b2, bm, 'merge delete')
def testFailMergeDelete(self):
base, b1, b2, bm, e1, e2, items = self._setupConflict()
b1.remove(items[0])
b2.remove(items[0])
test_merge(base, b1, b2, bm, 'merge conflicting delete',
should_fail=1)
def testMergeInserts(self):
base, b1, b2, bm, e1, e2, items = self._setupConflict()
b1.insert(-99999)
b1.insert(e1[0])
b2.insert(99999)
b2.insert(e1[2])
bm.insert(-99999)
bm.insert(e1[0])
bm.insert(99999)
bm.insert(e1[2])
test_merge(base, b1, b2, bm, 'merge insert')
def testMergeInsertsFromEmpty(self):
base, b1, b2, bm, e1, e2, items = self._setupConflict()
base.clear()
b1.clear()
b2.clear()
bm.clear()
b1.update(e1)
bm.update(e1)
b2.update(e2)
bm.update(e2)
test_merge(base, b1, b2, bm, 'merge insert from empty')
def testMergeEmptyAndFill(self):
base, b1, b2, bm, e1, e2, items = self._setupConflict()
b1.clear()
bm.clear()
b2.update(e2)
bm.update(e2)
test_merge(base, b1, b2, bm, 'merge insert from empty')
def testMergeEmpty(self):
base, b1, b2, bm, e1, e2, items = self._setupConflict()
b1.clear()
bm.clear()
test_merge(base, b1, b2, bm, 'empty one and not other')
def testFailMergeInsert(self):
base, b1, b2, bm, e1, e2, items = self._setupConflict()
b1.insert(-99999)
b1.insert(e1[0])
b2.insert(99999)
b2.insert(e1[0])
test_merge(base, b1, b2, bm, 'merge conflicting inserts',
should_fail=1)
def test_merge(o1, o2, o3, expect, message='failed to merge', should_fail=0):
s1=o1.__getstate__()
s2=o2.__getstate__()
s3=o3.__getstate__()
expected=expect.__getstate__()
if expected is None: expected=((((),),),)
if should_fail:
try:
merged=o1._p_resolveConflict(s1, s2, s3)
except (ConflictError, ValueError), err:
pass # ConflictError is the only exception that should occur
else:
assert 0, message
else:
merged=o1._p_resolveConflict(s1, s2, s3)
assert merged==expected, message
class BucketTests(MappingBase):
""" Tests common to all buckets """
class BTreeTests(MappingBase):
""" Tests common to all BTrees """
## BTree tests
class TestIOBTrees(BTreeTests, TestCase):
def setUp(self):
self.t = IOBTree()
class TestOOBTrees(BTreeTests, TestCase):
def setUp(self):
self.t = OOBTree()
class TestOIBTrees(BTreeTests, TestCase):
def setUp(self):
self.t = OIBTree()
class TestIIBTrees(BTreeTests, TestCase):
def setUp(self):
self.t = IIBTree()
## Set tests
class TestIOSets(ExtendedSetTests, TestCase):
def setUp(self):
self.t = IOSet()
class TestOOSets(ExtendedSetTests, TestCase):
def setUp(self):
self.t = OOSet()
class TestIISets(ExtendedSetTests, TestCase):
def setUp(self):
self.t = IISet()
class TestOISets(ExtendedSetTests, TestCase):
def setUp(self):
self.t = OISet()
class TestIOTreeSets(NormalSetTests, TestCase):
def setUp(self):
self.t = IOTreeSet()
class TestOOTreeSets(NormalSetTests, TestCase):
def setUp(self):
self.t = OOTreeSet()
class TestIITreeSets(NormalSetTests, TestCase):
def setUp(self):
self.t = IITreeSet()
class TestOITreeSets(NormalSetTests, TestCase):
def setUp(self):
self.t = OITreeSet()
## Bucket tests
class TestIOBuckets(BucketTests, TestCase):
def setUp(self):
self.t = IOBucket()
class TestOOBuckets(BucketTests, TestCase):
def setUp(self):
self.t = OOBucket()
class TestIIBuckets(BucketTests, TestCase):
def setUp(self):
self.t = IIBucket()
class TestOIBuckets(BucketTests, TestCase):
def setUp(self):
self.t = OIBucket()
# XXX disable tests for now
def _test_suite():
TIOBTree = makeSuite(TestIOBTrees, 'test')
TOOBTree = makeSuite(TestOOBTrees, 'test')
TOIBTree = makeSuite(TestOIBTrees, 'test')
TIIBTree = makeSuite(TestIIBTrees, 'test')
TIOSet = makeSuite(TestIOSets, 'test')
TOOSet = makeSuite(TestOOSets, 'test')
TOISet = makeSuite(TestIOSets, 'test')
TIISet = makeSuite(TestOOSets, 'test')
TIOTreeSet = makeSuite(TestIOTreeSets, 'test')
TOOTreeSet = makeSuite(TestOOTreeSets, 'test')
TOITreeSet = makeSuite(TestIOTreeSets, 'test')
TIITreeSet = makeSuite(TestOOTreeSets, 'test')
TIOBucket = makeSuite(TestIOBuckets, 'test')
TOOBucket = makeSuite(TestOOBuckets, 'test')
TOIBucket = makeSuite(TestOIBuckets, 'test')
TIIBucket = makeSuite(TestIIBuckets, 'test')
alltests = TestSuite((TIOSet, TOOSet, TOISet, TIISet,
TIOTreeSet, TOOTreeSet, TOITreeSet, TIITreeSet,
TIOBucket, TOOBucket, TOIBucket, TIIBucket,
TOOBTree, TIOBTree, TOIBTree, TIIBTree))
return alltests
## utility functions
def lsubtract(l1, l2):
l1=list(l1)
l2=list(l2)
l = filter(lambda x, l1=l1: x not in l1, l2)
l = l + filter(lambda x, l2=l2: x not in l2, l1)
return l
def realseq(itemsob):
return map(lambda x: x, itemsob)
def main():
TextTestRunner().run(test_suite())
if __name__ == '__main__':
main()