[Zope-Checkins] CVS: ZODB3/BDBStorage/tests - test_autopack.py:1.8
Barry Warsaw
barry@wooz.org
Tue, 21 Jan 2003 14:22:41 -0500
Update of /cvs-repository/ZODB3/BDBStorage/tests
In directory cvs.zope.org:/tmp/cvs-serv18453
Modified Files:
test_autopack.py
Log Message:
Added a few elaborate, thread based tests of the pack race condition
for both full and minimal bdb storages.
=== ZODB3/BDBStorage/tests/test_autopack.py 1.7 => 1.8 ===
--- ZODB3/BDBStorage/tests/test_autopack.py:1.7 Mon Jan 13 19:05:11 2003
+++ ZODB3/BDBStorage/tests/test_autopack.py Tue Jan 21 14:22:39 2003
@@ -12,13 +12,19 @@
#
##############################################################################
+from __future__ import nested_scopes
+
import os
import time
import unittest
+import threading
from ZODB import DB
+from ZODB.Transaction import Transaction
from ZODB.referencesf import referencesf
+from ZODB.TimeStamp import TimeStamp
from ZODB.tests.MinPO import MinPO
+from ZODB.tests.StorageTestBase import zodb_pickle
from Persistence import Persistent
import BDBStorage
@@ -31,9 +37,15 @@
class FakeBaseClass: pass
BDBFullStorage = BDBMinimalStorage = FakeBaseClass
+from BDBStorage import ZERO
from BDBStorage.tests.BerkeleyTestBase import BerkeleyTestBase
-ZERO = '\0'*8
+try:
+ True, False
+except NameError:
+ True = 1
+ False = 0
+
class C(Persistent):
pass
@@ -65,6 +77,7 @@
self._zap_dbhome(dir)
raise
+
class TestAutopack(TestAutopackBase):
ConcreteStorage = BDBFullStorage
@@ -259,6 +272,257 @@
+class RaceConditionBase(BerkeleyTestBase):
+ def setUp(self):
+ BerkeleyTestBase.setUp(self)
+ self._cv = threading.Condition()
+ self._storage.cv = self._cv
+
+ def tearDown(self):
+ # clean up any outstanding transactions
+ get_transaction().abort()
+
+
+
+# Subclass which does ugly things to _dopack so we can actually test the race
+# condition. We need to store a new object in the database between the
+# _mark() call and the _sweep() call.
+class SynchronizedFullStorage(BDBFullStorage):
+ # XXX Cut and paste copy from BDBFullStorage, except where indicated
+ def _dopack(self, t, gc=True):
+ # t is a TimeTime, or time float, convert this to a TimeStamp object,
+ # using an algorithm similar to what's used in FileStorage. We know
+ # that our transaction ids, a.k.a. revision ids, are timestamps.
+ #
+ # BAW: should a pack time in the future be a ValueError? We'd have to
+ # worry about clock skew, so for now, we just set the pack time to the
+ # minimum of t and now.
+ packtime = min(t, time.time())
+ t0 = TimeStamp(*(time.gmtime(packtime)[:5] + (packtime % 60,)))
+ packtid = `t0`
+ # Collect all revisions of all objects earlier than the pack time.
+ self._lock_acquire()
+ try:
+ self._withtxn(self._collect_revs, packtid)
+ finally:
+ self._lock_release()
+ # Collect any objects with refcount zero.
+ self._lock_acquire()
+ try:
+ self._withtxn(self._collect_objs)
+ finally:
+ self._lock_release()
+ # If we're not doing a classic pack, we're done.
+ if not gc:
+ return
+ # Do a mark and sweep for garbage collection. Calculate the set of
+ # objects reachable from the root. Anything else is a candidate for
+ # having all their revisions packed away. The set of reachable
+ # objects lives in the _packmark table.
+ self._lock_acquire()
+ try:
+ self._withtxn(self._mark, packtid)
+ finally:
+ self._lock_release()
+ # XXX thread coordination code start
+ self.cv.acquire()
+ self.cv.notify()
+ self.cv.wait()
+ # XXX thread coordination code stop
+ #
+ # Now perform a sweep, using oidqueue to hold all object ids for
+ # objects which are not root reachable as of the pack time.
+ self._lock_acquire()
+ try:
+ self._withtxn(self._sweep, packtid)
+ finally:
+ self._lock_release()
+ # Once again, collect any objects with refcount zero due to the mark
+ # and sweep garbage collection pass.
+ self._lock_acquire()
+ try:
+ self._withtxn(self._collect_objs)
+ finally:
+ self._lock_release()
+ # XXX thread coordination code start
+ self.cv.notify()
+ self.cv.release()
+ # XXX thread coordination code stop
+
+
+class FullPackThread(threading.Thread):
+ def __init__(self, storage):
+ threading.Thread.__init__(self)
+ self._storage = storage
+
+ def run(self):
+ self._storage.autopack(time.time(), gc=True)
+
+
+class TestFullClassicPackRaceCondition(RaceConditionBase):
+ ConcreteStorage = SynchronizedFullStorage
+
+ def testRaceCondition(self):
+ unless = self.failUnless
+ storage = self._storage
+ db = DB(storage)
+ conn = db.open()
+ root = conn.root()
+ # Start by storing a root reachable object.
+ obj1 = C()
+ obj1.value = 888
+ root.obj1 = obj1
+ txn = get_transaction()
+ txn.note('root -> obj1')
+ txn.commit()
+ # Now, start a transaction, store an object, but don't yet complete
+ # the transaction. This will ensure that the second object has a tid
+ # < packtime, but it won't be root reachable yet.
+ obj2 = C()
+ t = Transaction()
+ storage.tpc_begin(t)
+ obj2sn = storage.store('\0'*7 + '\2', ZERO, zodb_pickle(obj2), '', t)
+ # Now, acquire the condvar lock and start a thread that will do a
+ # pack, up to the _sweep call. Wait for the _mark() call to
+ # complete.
+ now = time.time()
+ while now == time.time():
+ time.sleep(0.5)
+ self._cv.acquire()
+ packthread = FullPackThread(storage)
+ packthread.start()
+ self._cv.wait()
+ # Now that the _mark() has finished, complete the transaction, which
+ # links the object to root.
+ root.obj2 = obj2
+ rootsn = storage.getSerial(ZERO)
+ rootsn = storage.store(ZERO, rootsn, zodb_pickle(root), '', t)
+ storage.tpc_vote(t)
+ storage.tpc_finish(t)
+ # And notify the pack thread that it can do the sweep and collect
+ self._cv.notify()
+ self._cv.wait()
+ # We're done with the condvar and the thread
+ self._cv.release()
+ packthread.join()
+ # Now make sure that all the interesting objects are still available
+ rootsn = storage.getSerial(ZERO)
+ obj1sn = storage.getSerial('\0'*7 + '\1')
+ obj2sn = storage.getSerial('\0'*7 + '\2')
+ # obj1 revision was written before the second revision of the root
+ unless(obj1sn < rootsn)
+ unless(rootsn == obj2sn)
+ unless(obj1sn < obj2sn)
+
+
+
+# Subclass which does ugly things to _dopack so we can actually test the race
+# condition. We need to storage a new object in the database between the
+# _mark() call and the _sweep() call.
+class SynchronizedMinimalStorage(BDBMinimalStorage):
+ # XXX Cut and paste copy from BDBMinimalStorage, except where indicated
+ def _dopack(self):
+ # Do a mark and sweep for garbage collection. Calculate the set of
+ # objects reachable from the root. Anything else is a candidate for
+ # having all their revisions packed away. The set of reachable
+ # objects lives in the _packmark table.
+ self._lock_acquire()
+ try:
+ self._withtxn(self._mark)
+ finally:
+ self._lock_release()
+ # XXX thread coordination code start
+ self.cv.acquire()
+ self.cv.notify()
+ self.cv.wait()
+ # XXX thread coordination code stop
+ #
+ # Now perform a sweep, using oidqueue to hold all object ids for
+ # objects which are not root reachable as of the pack time.
+ self._lock_acquire()
+ try:
+ self._withtxn(self._sweep)
+ finally:
+ self._lock_release()
+ # Once again, collect any objects with refcount zero due to the mark
+ # and sweep garbage collection pass.
+ self._lock_acquire()
+ try:
+ self._withtxn(self._collect_objs)
+ finally:
+ self._lock_release()
+ # XXX thread coordination code start
+ self.cv.notify()
+ self.cv.release()
+ # XXX thread coordination code stop
+
+
+class MinimalPackThread(threading.Thread):
+ def __init__(self, storage):
+ threading.Thread.__init__(self)
+ self._storage = storage
+
+ def run(self):
+ self._storage.pack(time.time(), referencesf)
+
+
+class TestMinimalClassicPackRaceCondition(RaceConditionBase):
+ ConcreteStorage = SynchronizedMinimalStorage
+
+ def testRaceCondition(self):
+ unless = self.failUnless
+ storage = self._storage
+ db = DB(storage)
+ conn = db.open()
+ root = conn.root()
+ # Start by storing a root reachable object.
+ obj1 = C()
+ obj1.value = 888
+ root.obj1 = obj1
+ txn = get_transaction()
+ txn.note('root -> obj1')
+ txn.commit()
+ # Now, start a transaction, store an object, but don't yet complete
+ # the transaction. This will ensure that the second object has a tid
+ # < packtime, but it won't be root reachable yet.
+ obj2 = C()
+ t = Transaction()
+ storage.tpc_begin(t)
+ obj2sn = storage.store('\0'*7 + '\2', ZERO, zodb_pickle(obj2), '', t)
+ # Now, acquire the condvar lock and start a thread that will do a
+ # pack, up to the _sweep call. Wait for the _mark() call to
+ # complete.
+ now = time.time()
+ while now == time.time():
+ time.sleep(0.5)
+ self._cv.acquire()
+ packthread = MinimalPackThread(storage)
+ packthread.start()
+ self._cv.wait()
+ # Now that the _mark() has finished, complete the transaction, which
+ # links the object to root.
+ root.obj2 = obj2
+ rootsn = storage.getSerial(ZERO)
+ rootsn = storage.store(ZERO, rootsn, zodb_pickle(root), '', t)
+ storage.tpc_vote(t)
+ storage.tpc_finish(t)
+ # And notify the pack thread that it can do the sweep and collect
+ self._cv.notify()
+ self._cv.wait()
+ # We're done with the condvar and the thread
+ self._cv.release()
+ packthread.join()
+ # Now make sure that all the interesting objects are still available
+ rootsn = storage.getSerial(ZERO)
+ obj1sn = storage.getSerial('\0'*7 + '\1')
+ obj2sn = storage.getSerial('\0'*7 + '\2')
+ # obj1 revision was written before the second revision of the root
+ unless(obj1sn < rootsn)
+ unless(rootsn == obj2sn)
+ unless(obj1sn < obj2sn)
+
+
+
def test_suite():
suite = unittest.TestSuite()
suite.level = 2
@@ -266,6 +530,8 @@
suite.addTest(unittest.makeSuite(TestAutopack))
suite.addTest(unittest.makeSuite(TestAutomaticClassicPack))
suite.addTest(unittest.makeSuite(TestMinimalPack))
+ suite.addTest(unittest.makeSuite(TestFullClassicPackRaceCondition))
+ suite.addTest(unittest.makeSuite(TestMinimalClassicPackRaceCondition))
return suite