[Zope-Checkins] CVS: ZODB3/BDBStorage/tests - test_autopack.py:1.6.2.1 test_whitebox.py:1.4.2.1
Barry Warsaw
barry@wooz.org
Tue, 7 Jan 2003 14:36:24 -0500
Update of /cvs-repository/ZODB3/BDBStorage/tests
In directory cvs.zope.org:/tmp/cvs-serv23965/BDBStorage/tests
Added Files:
Tag: ZODB3-3_1-branch
test_autopack.py test_whitebox.py
Log Message:
Sync'ing with the trunk for BDBStorage, these are new files.
=== Added File ZODB3/BDBStorage/tests/test_autopack.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import os
import time
import unittest
from ZODB import DB
from ZODB.referencesf import referencesf
from ZODB.tests.MinPO import MinPO
from Persistence import Persistent
import BDBStorage
if BDBStorage.is_available:
from BDBStorage.BDBFullStorage import BDBFullStorage
from BDBStorage.BDBMinimalStorage import BDBMinimalStorage
from BDBStorage.BerkeleyBase import BerkeleyConfig
else:
# Sigh
class FakeBaseClass: pass
BDBFullStorage = BDBMinimalStorage = FakeBaseClass
from BDBStorage.tests.BerkeleyTestBase import BerkeleyTestBase
ZERO = '\0'*8
class C(Persistent):
pass
class TestAutopackBase(BerkeleyTestBase):
def _config(self):
config = BerkeleyConfig()
# Autopack every 3 seconds, 6 seconds into the past, no classic packs
config.frequency = 3
config.packtime = 6
config.classicpack = 0
return config
def _wait_for_next_autopack(self):
storage = self._storage
# BAW: this uses a non-public interface
packtime = storage._autopacker._nextcheck
while packtime == storage._autopacker._nextcheck:
time.sleep(1)
def _mk_dbhome(self, dir):
# Create the storage
os.mkdir(dir)
try:
return self.ConcreteStorage(dir, config=self._config())
except:
self._zap_dbhome(dir)
raise
class TestAutopack(TestAutopackBase):
ConcreteStorage = BDBFullStorage
def checkAutopack(self):
unless = self.failUnless
raises = self.assertRaises
storage = self._storage
# Wait for an autopack operation to occur, then make three revisions
# to an object. Wait for the next autopack operation and make sure
# all three revisions still exist. Then sleep 10 seconds and wait for
# another autopack operation. Then verify that the first two
# revisions have been packed away.
oid = storage.new_oid()
self._wait_for_next_autopack()
revid1 = self._dostore(oid, data=MinPO(2112))
revid2 = self._dostore(oid, revid=revid1, data=MinPO(2113))
revid3 = self._dostore(oid, revid=revid2, data=MinPO(2114))
self._wait_for_next_autopack()
unless(storage.loadSerial(oid, revid1))
unless(storage.loadSerial(oid, revid2))
unless(storage.loadSerial(oid, revid3))
# Should be enough time for the revisions to get packed away
time.sleep(10)
self._wait_for_next_autopack()
# The first two revisions should now be gone, but the third should
# still exist because it's the current revision, and we haven't done a
# classic pack.
raises(KeyError, self._storage.loadSerial, oid, revid1)
raises(KeyError, self._storage.loadSerial, oid, revid2)
unless(storage.loadSerial(oid, revid3))
class TestAutomaticClassicPack(TestAutopackBase):
ConcreteStorage = BDBFullStorage
def _config(self):
config = BerkeleyConfig()
# Autopack every 3 seconds, 6 seconds into the past, no classic packs
config.frequency = 3
config.packtime = 6
config.classicpack = 1
return config
def checkAutomaticClassicPack(self):
unless = self.failUnless
raises = self.assertRaises
storage = self._storage
# Wait for an autopack operation to occur, then make three revisions
# to an object. Wait for the next autopack operation and make sure
# all three revisions still exist. Then sleep 10 seconds and wait for
# another autopack operation. Then verify that the first two
# revisions have been packed away.
oid = storage.new_oid()
self._wait_for_next_autopack()
revid1 = self._dostore(oid, data=MinPO(2112))
revid2 = self._dostore(oid, revid=revid1, data=MinPO(2113))
revid3 = self._dostore(oid, revid=revid2, data=MinPO(2114))
self._wait_for_next_autopack()
unless(storage.loadSerial(oid, revid1))
unless(storage.loadSerial(oid, revid2))
unless(storage.loadSerial(oid, revid3))
# Should be enough time for the revisions to get packed away
time.sleep(10)
self._wait_for_next_autopack()
# The first two revisions should now be gone, but the third should
# still exist because it's the current revision, and we haven't done a
# classic pack.
raises(KeyError, storage.loadSerial, oid, revid1)
raises(KeyError, storage.loadSerial, oid, revid2)
raises(KeyError, storage.loadSerial, oid, revid3)
def checkCycleUnreachable(self):
unless = self.failUnless
raises = self.assertRaises
storage = self._storage
db = DB(storage)
conn = db.open()
root = conn.root()
self._wait_for_next_autopack()
# Store an object that's reachable from the root
obj1 = C()
obj2 = C()
obj1.obj = obj2
obj2.obj = obj1
root.obj = obj1
txn = get_transaction()
txn.note('root -> obj1 <-> obj2')
txn.commit()
oid1 = obj1._p_oid
oid2 = obj2._p_oid
assert oid1 and oid2 and oid1 <> oid2
self._wait_for_next_autopack()
unless(storage.load(ZERO, ''))
unless(storage.load(oid1, ''))
unless(storage.load(oid2, ''))
# Now unlink it, which should still leave obj1 and obj2 alive
del root.obj
txn = get_transaction()
txn.note('root -X-> obj1 <-> obj2')
txn.commit()
unless(storage.load(ZERO, ''))
unless(storage.load(oid1, ''))
unless(storage.load(oid2, ''))
# Do an explicit full pack to right now to collect all the old
# revisions and the cycle.
storage.pack(time.time(), referencesf)
# And it should be packed away
unless(storage.load(ZERO, ''))
raises(KeyError, storage.load, oid1, '')
raises(KeyError, storage.load, oid2, '')
class TestMinimalPack(TestAutopackBase):
ConcreteStorage = BDBMinimalStorage
def _config(self):
config = BerkeleyConfig()
# Autopack every 3 seconds
config.frequency = 3
return config
def checkRootUnreachable(self):
unless = self.failUnless
raises = self.assertRaises
storage = self._storage
db = DB(storage)
conn = db.open()
root = conn.root()
self._wait_for_next_autopack()
# Store an object that's reachable from the root
obj = C()
obj.value = 999
root.obj = obj
txn = get_transaction()
txn.note('root -> obj')
txn.commit()
oid = obj._p_oid
assert oid
self._wait_for_next_autopack()
unless(storage.load(ZERO, ''))
unless(storage.load(oid, ''))
# Now unlink it
del root.obj
txn = get_transaction()
txn.note('root -X-> obj')
txn.commit()
# The object should be gone due to reference counting
unless(storage.load(ZERO, ''))
raises(KeyError, storage.load, oid, '')
def checkCycleUnreachable(self):
unless = self.failUnless
raises = self.assertRaises
storage = self._storage
db = DB(storage)
conn = db.open()
root = conn.root()
self._wait_for_next_autopack()
# Store an object that's reachable from the root
obj1 = C()
obj2 = C()
obj1.obj = obj2
obj2.obj = obj1
root.obj = obj1
txn = get_transaction()
txn.note('root -> obj1 <-> obj2')
txn.commit()
oid1 = obj1._p_oid
oid2 = obj2._p_oid
assert oid1 and oid2 and oid1 <> oid2
self._wait_for_next_autopack()
unless(storage.load(ZERO, ''))
unless(storage.load(oid1, ''))
unless(storage.load(oid2, ''))
# Now unlink it, which should still leave obj1 and obj2 alive
del root.obj
txn = get_transaction()
txn.note('root -X-> obj1 <-> obj2')
txn.commit()
unless(storage.load(ZERO, ''))
unless(storage.load(oid1, ''))
unless(storage.load(oid2, ''))
# But the next autopack should collect both obj1 and obj2
self._wait_for_next_autopack()
# And it should be packed away
unless(storage.load(ZERO, ''))
raises(KeyError, storage.load, oid1, '')
raises(KeyError, storage.load, oid2, '')
def test_suite():
suite = unittest.TestSuite()
if BDBStorage.is_available:
suite.addTest(unittest.makeSuite(TestAutopack, 'check'))
suite.addTest(unittest.makeSuite(TestAutomaticClassicPack, 'check'))
suite.addTest(unittest.makeSuite(TestMinimalPack, 'check'))
return suite
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
=== Added File ZODB3/BDBStorage/tests/test_whitebox.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# Whitebox testing of storage implementation details.
import unittest
from ZODB.utils import U64
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle
import BDBStorage
if BDBStorage.is_available:
from BDBStorage.BDBMinimalStorage import BDBMinimalStorage
from BDBStorage.BDBFullStorage import BDBFullStorage
else:
# Sigh
class FakeBaseClass: pass
BDBFullStorage = BDBMinimalStorage = FakeBaseClass
from BDBStorage.tests.ZODBTestBase import ZODBTestBase
from BDBStorage.tests.BerkeleyTestBase import BerkeleyTestBase
from Persistence import Persistent
ZERO = '\0'*8
class Object(Persistent):
pass
class WhiteboxLowLevelMinimal(BerkeleyTestBase):
ConcreteStorage = BDBMinimalStorage
def checkTableConsistencyAfterCommit(self):
unless = self.failIf
eq = self.assertEqual
oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=11)
revid2 = self._dostore(oid, revid=revid1, data=12)
revid3 = self._dostore(oid, revid=revid2, data=13)
# First off, there should be no entries in the pending table
unless(self._storage._pending.keys())
# Also, there should be no entries in the oids table
unless(self._storage._oids.keys())
# Now, there should be exactly one oid in the serials table, and
# exactly one record for that oid in the table too.
oids = {}
c = self._storage._serials.cursor()
try:
rec = c.first()
while rec:
oid, serial = rec
oids.setdefault(oid, []).append(serial)
rec = c.next()
finally:
c.close()
eq(len(oids), 1)
eq(len(oids[oids.keys()[0]]), 1)
# There should now be exactly one entry in the pickles table.
pickles = self._storage._pickles.items()
eq(len(pickles), 1)
key, data = pickles[0]
poid = key[:8]
pserial = key[8:]
eq(oid, poid)
eq(revid3, pserial)
obj = zodb_unpickle(data)
eq(obj.value, 13)
# Now verify the refcounts table, which should be empty because the
# stored object isn't referenced by any other objects.
eq(len(self._storage._refcounts.keys()), 0)
class WhiteboxHighLevelMinimal(ZODBTestBase):
ConcreteStorage = BDBMinimalStorage
def checkReferenceCounting(self):
eq = self.assertEqual
obj = MinPO(11)
self._root.obj = obj
get_transaction().commit()
obj.value = 12
get_transaction().commit()
obj.value = 13
get_transaction().commit()
# Make sure the databases have what we expect
eq(len(self._storage._serials.items()), 2)
eq(len(self._storage._pickles.items()), 2)
# And now refcount out the object
del self._root.obj
get_transaction().commit()
# Verification stage. Our serials table should have exactly one
# entry, oid == 0
keys = self._storage._serials.keys()
eq(len(keys), 1)
eq(len(self._storage._serials.items()), 1)
eq(keys[0], ZERO)
# The pickles table now should have exactly one revision of the root
# object, and no revisions of the MinPO object, which should have been
# collected away.
pickles = self._storage._pickles.items()
eq(len(pickles), 1)
rec = pickles[0]
key = rec[0]
data = rec[1]
eq(key[:8], ZERO)
# And that pickle should have no 'obj' attribute.
unobj = zodb_unpickle(data)
self.failIf(hasattr(unobj, 'obj'))
# Our refcounts table should have no entries in it, because the root
# object is an island.
eq(len(self._storage._refcounts.keys()), 0)
# And of course, oids and pendings should be empty too
eq(len(self._storage._oids.keys()), 0)
eq(len(self._storage._pending.keys()), 0)
def checkRecursiveReferenceCounting(self):
eq = self.assertEqual
obj1 = Object()
obj2 = Object()
obj3 = Object()
obj4 = Object()
self._root.obj = obj1
obj1.obj = obj2
obj2.obj = obj3
obj3.obj = obj4
get_transaction().commit()
# Make sure the databases have what we expect
eq(len(self._storage._serials.items()), 5)
eq(len(self._storage._pickles.items()), 5)
# And now refcount out the object
del self._root.obj
get_transaction().commit()
# Verification stage. Our serials table should have exactly one
# entry, oid == 0
keys = self._storage._serials.keys()
eq(len(keys), 1)
eq(len(self._storage._serials.items()), 1)
eq(keys[0], ZERO)
# The pickles table now should have exactly one revision of the root
# object, and no revisions of any other objects, which should have
# been collected away.
pickles = self._storage._pickles.items()
eq(len(pickles), 1)
rec = pickles[0]
key = rec[0]
data = rec[1]
eq(key[:8], ZERO)
# And that pickle should have no 'obj' attribute.
unobj = zodb_unpickle(data)
self.failIf(hasattr(unobj, 'obj'))
# Our refcounts table should have no entries in it, because the root
# object is an island.
eq(len(self._storage._refcounts.keys()), 0)
# And of course, oids and pendings should be empty too
eq(len(self._storage._oids.keys()), 0)
eq(len(self._storage._pending.keys()), 0)
class WhiteboxHighLevelFull(ZODBTestBase):
ConcreteStorage = BDBFullStorage
def checkReferenceCounting(self):
eq = self.assertEqual
# Make sure the databases have what we expect
eq(len(self._storage._serials.items()), 1)
eq(len(self._storage._pickles.items()), 1)
# Now store an object
obj = MinPO(11)
self._root.obj = obj
get_transaction().commit()
# Make sure the databases have what we expect
eq(len(self._storage._serials.items()), 2)
eq(len(self._storage._pickles.items()), 3)
obj.value = 12
get_transaction().commit()
# Make sure the databases have what we expect
eq(len(self._storage._serials.items()), 2)
eq(len(self._storage._pickles.items()), 4)
obj.value = 13
get_transaction().commit()
# Make sure the databases have what we expect
eq(len(self._storage._serials.items()), 2)
eq(len(self._storage._pickles.items()), 5)
# And now refcount out the object
del self._root.obj
get_transaction().commit()
# Verification stage. Our serials tabl should still have 2 entries,
# one for the root object and one for the now unlinked MinPO obj.
keys = self._storage._serials.keys()
eq(len(keys), 2)
eq(len(self._storage._serials.items()), 2)
eq(keys[0], ZERO)
# The pickles table should now have 6 entries, broken down like so:
# - 3 revisions of the root object: the initial database-open
# revision, the revision that got its obj attribute set, and the
# revision that got its obj attribute deleted.
# - 3 Three revisions of obj, corresponding to values 11, 12, and 13
pickles = self._storage._pickles.items()
eq(len(pickles), 6)
# Our refcounts table should have one entry in it for the MinPO that's
# referenced in an earlier revision of the root object
eq(len(self._storage._refcounts.keys()), 1)
# And of course, oids and pendings should be empty too
eq(len(self._storage._oids.keys()), 0)
eq(len(self._storage._pending.keys()), 0)
def test_suite():
suite = unittest.TestSuite()
if BDBStorage.is_available:
suite.addTest(unittest.makeSuite(WhiteboxLowLevelMinimal, 'check'))
suite.addTest(unittest.makeSuite(WhiteboxHighLevelMinimal, 'check'))
suite.addTest(unittest.makeSuite(WhiteboxHighLevelFull, 'check'))
return suite
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')