[Zodb-checkins] CVS: ZODB3/bsddb3Storage/bsddb3Storage/tests - test_autopack.py:1.2
Barry Warsaw
barry@wooz.org
Mon, 11 Nov 2002 17:07:50 -0500
Update of /cvs-repository/ZODB3/bsddb3Storage/bsddb3Storage/tests
In directory cvs.zope.org:/tmp/cvs-serv29779
Modified Files:
test_autopack.py
Log Message:
checkRootUnreachable(), checkCycleUnreachable(): Distinguish tests
between objects which are unreachable from the root -- and thus might
be collected by autopack -- and cycles which are unreachable from the
root -- which can only be collected by a full pack.
=== ZODB3/bsddb3Storage/bsddb3Storage/tests/test_autopack.py 1.1 => 1.2 ===
--- ZODB3/bsddb3Storage/bsddb3Storage/tests/test_autopack.py:1.1 Fri Nov 8 18:10:16 2002
+++ ZODB3/bsddb3Storage/bsddb3Storage/tests/test_autopack.py Mon Nov 11 17:07:50 2002
@@ -16,11 +16,21 @@
import time
import unittest
+from ZODB import DB
+from ZODB.referencesf import referencesf
from ZODB.tests.MinPO import MinPO
+from Persistence import Persistent
+
from bsddb3Storage.Full import Full
+from bsddb3Storage.Minimal import Minimal
from bsddb3Storage.BerkeleyBase import BerkeleyConfig
from bsddb3Storage.tests.BerkeleyTestBase import BerkeleyTestBase
+ZERO = '\0'*8
+
+class C(Persistent):
+ pass
+
class TestAutopackBase(BerkeleyTestBase):
@@ -43,14 +53,15 @@
# Create the storage
os.mkdir(dir)
try:
- return Full(dir, config=self._config())
+ return self.ConcreteStorage(dir, config=self._config())
except:
self._zap_dbhome(dir)
raise
-
class TestAutopack(TestAutopackBase):
+ ConcreteStorage = Full
+
def checkAutopack(self):
unless = self.failUnless
raises = self.assertRaises
@@ -82,6 +93,8 @@
class TestAutomaticClassicPack(TestAutopackBase):
+ ConcreteStorage = Full
+
def _config(self):
config = BerkeleyConfig()
# Autopack every 3 seconds, 6 seconds into the past, no classic packs
@@ -114,9 +127,128 @@
# The first two revisions should now be gone, but the third should
# still exist because it's the current revision, and we haven't done a
# classic pack.
- raises(KeyError, self._storage.loadSerial, oid, revid1)
- raises(KeyError, self._storage.loadSerial, oid, revid2)
- raises(KeyError, self._storage.loadSerial, oid, revid3)
+ raises(KeyError, storage.loadSerial, oid, revid1)
+ raises(KeyError, storage.loadSerial, oid, revid2)
+ raises(KeyError, storage.loadSerial, oid, revid3)
+
+ def checkCycleUnreachable(self):
+ unless = self.failUnless
+ raises = self.assertRaises
+ storage = self._storage
+ db = DB(storage)
+ conn = db.open()
+ root = conn.root()
+ self._wait_for_next_autopack()
+ # Store an object that's reachable from the root
+ obj1 = C()
+ obj2 = C()
+ obj1.obj = obj2
+ obj2.obj = obj1
+ root.obj = obj1
+ txn = get_transaction()
+ txn.note('root -> obj1 <-> obj2')
+ txn.commit()
+ oid1 = obj1._p_oid
+ oid2 = obj2._p_oid
+ assert oid1 and oid2 and oid1 <> oid2
+ self._wait_for_next_autopack()
+ unless(storage.load(ZERO, ''))
+ unless(storage.load(oid1, ''))
+ unless(storage.load(oid2, ''))
+ # Now unlink it, which should still leave obj1 and obj2 alive
+ del root.obj
+ txn = get_transaction()
+ txn.note('root -X-> obj1 <-> obj2')
+ txn.commit()
+ unless(storage.load(ZERO, ''))
+ unless(storage.load(oid1, ''))
+ unless(storage.load(oid2, ''))
+ # Do an explicit full pack to right now to collect all the old
+ # revisions and the cycle.
+ storage.pack(time.time(), referencesf)
+ # And it should be packed away
+ unless(storage.load(ZERO, ''))
+ raises(KeyError, storage.load, oid1, '')
+ raises(KeyError, storage.load, oid2, '')
+
+
+
+class TestMinimalPack(TestAutopackBase):
+ ConcreteStorage = Minimal
+
+ def _config(self):
+ config = BerkeleyConfig()
+ # Autopack every 3 seconds
+ config.frequency = 3
+ return config
+
+ def checkRootUnreachable(self):
+ unless = self.failUnless
+ raises = self.assertRaises
+ storage = self._storage
+ db = DB(storage)
+ conn = db.open()
+ root = conn.root()
+ self._wait_for_next_autopack()
+ # Store an object that's reachable from the root
+ obj = C()
+ obj.value = 999
+ root.obj = obj
+ txn = get_transaction()
+ txn.note('root -> obj')
+ txn.commit()
+ oid = obj._p_oid
+ assert oid
+ self._wait_for_next_autopack()
+ unless(storage.load(ZERO, ''))
+ unless(storage.load(oid, ''))
+ # Now unlink it
+ del root.obj
+ txn = get_transaction()
+ txn.note('root -X-> obj')
+ txn.commit()
+ # The object should be gone due to reference counting
+ unless(storage.load(ZERO, ''))
+ raises(KeyError, storage.load, oid, '')
+
+ def checkCycleUnreachable(self):
+ unless = self.failUnless
+ raises = self.assertRaises
+ storage = self._storage
+ db = DB(storage)
+ conn = db.open()
+ root = conn.root()
+ self._wait_for_next_autopack()
+ # Store an object that's reachable from the root
+ obj1 = C()
+ obj2 = C()
+ obj1.obj = obj2
+ obj2.obj = obj1
+ root.obj = obj1
+ txn = get_transaction()
+ txn.note('root -> obj1 <-> obj2')
+ txn.commit()
+ oid1 = obj1._p_oid
+ oid2 = obj2._p_oid
+ assert oid1 and oid2 and oid1 <> oid2
+ self._wait_for_next_autopack()
+ unless(storage.load(ZERO, ''))
+ unless(storage.load(oid1, ''))
+ unless(storage.load(oid2, ''))
+ # Now unlink it, which should still leave obj1 and obj2 alive
+ del root.obj
+ txn = get_transaction()
+ txn.note('root -X-> obj1 <-> obj2')
+ txn.commit()
+ unless(storage.load(ZERO, ''))
+ unless(storage.load(oid1, ''))
+ unless(storage.load(oid2, ''))
+ # But the next autopack should collect both obj1 and obj2
+ self._wait_for_next_autopack()
+ # And it should be packed away
+ unless(storage.load(ZERO, ''))
+ raises(KeyError, storage.load, oid1, '')
+ raises(KeyError, storage.load, oid2, '')
@@ -124,6 +256,7 @@
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestAutopack, 'check'))
suite.addTest(unittest.makeSuite(TestAutomaticClassicPack, 'check'))
+ suite.addTest(unittest.makeSuite(TestMinimalPack, 'check'))
return suite