[Zope3-checkins] CVS: Zope3/src/zodb/storage/tests - test_autopack.py:1.15
Barry Warsaw
barry@zope.com
Wed, 2 Jul 2003 18:38:17 -0400
Update of /cvs-repository/Zope3/src/zodb/storage/tests
In directory cvs.zope.org:/tmp/cvs-serv4233
Modified Files:
test_autopack.py
Log Message:
As with the minimal tests, the full storage race condition tests can't
possibly work any more, so let's just remove the whole mess.
=== Zope3/src/zodb/storage/tests/test_autopack.py 1.14 => 1.15 ===
--- Zope3/src/zodb/storage/tests/test_autopack.py:1.14 Thu Jun 26 13:56:33 2003
+++ Zope3/src/zodb/storage/tests/test_autopack.py Wed Jul 2 18:38:13 2003
@@ -255,267 +255,6 @@
-class RaceConditionBase(BerkeleyTestBase):
- def setUp(self):
- BerkeleyTestBase.setUp(self)
- self._cv = threading.Condition()
- self._storage.cv = self._cv
-
- def tearDown(self):
- # clean up any outstanding transactions
- get_transaction().abort()
- BerkeleyTestBase.tearDown(self)
-
- def _getPackThread(self, storage):
- raise NotImplementedError
-
- def testRaceCondition(self):
- unless = self.failUnless
- storage = self._storage
- db = DB(storage)
- conn = db.open()
- root = conn.root()
- # Start by storing a root reachable object.
- obj1 = C()
- obj1.value = 888
- root.obj1 = obj1
- txn = get_transaction()
- txn.note('root -> obj1')
- txn.commit()
- # Now, start a transaction, store an object, but don't yet complete
- # the transaction. This will ensure that the second object has a tid
- # < packtime, but it won't be root reachable yet.
- obj2 = C()
- t = Transaction()
- storage.tpcBegin(t)
- data, refs = zodb_pickle(obj2)
- obj2sn = storage.store('\0'*7 + '\2', ZERO, data, refs, '', t)
- # Now, acquire the condvar lock and start a thread that will do a
- # pack, up to the _sweep call. Wait for the _mark() call to
- # complete.
- snooze()
- self._cv.acquire()
- packthread = self._getPackThread(storage)
- packthread.start()
- self._cv.wait()
- # Now that the _mark() has finished, complete the transaction, which
- # links the object to root.
- root.obj2 = obj2
- rootsn = storage.getSerial(ZERO)
- data, refs = zodb_pickle(root)
- rootsn = storage.store(ZERO, rootsn, data, refs, '', t)
- storage.tpcVote(t)
- storage.tpcFinish(t)
- # And notify the pack thread that it can do the sweep and collect
- self._cv.notify()
- self._cv.wait()
- # We're done with the condvar and the thread
- self._cv.release()
- packthread.join()
- # Now make sure that all the interesting objects are still available
- rootsn = storage.getSerial(ZERO)
- obj1sn = storage.getSerial('\0'*7 + '\1')
- obj2sn = storage.getSerial('\0'*7 + '\2')
- # obj1 revision was written before the second revision of the root
- unless(obj1sn < rootsn)
- unless(rootsn == obj2sn)
- unless(obj1sn < obj2sn)
-
- def testEarlierRaceCondition(self):
- unless = self.failUnless
- storage = self._storage
- db = DB(storage)
- conn = db.open()
- root = conn.root()
- # Start by storing a root reachable object.
- obj1 = C()
- obj1.value = 888
- root.obj1 = obj1
- txn = get_transaction()
- txn.note('root -> obj1')
- txn.commit()
- # Now, start a transaction, store an object, but don't yet complete
- # the transaction. This will ensure that the second object has a tid
- # < packtime, but it won't be root reachable yet.
- obj2 = C()
- t = Transaction()
- storage.tpcBegin(t)
- # Now, acquire the condvar lock and start a thread that will do a
- # pack, up to the _sweep call. Wait for the _mark() call to
- # complete.
- snooze()
- self._cv.acquire()
- packthread = self._getPackThread(storage)
- packthread.start()
- self._cv.wait()
- data, refs = zodb_pickle(obj2)
- obj2sn = storage.store('\0'*7 + '\2', ZERO, data, refs, '', t)
- # Now that the _mark() has finished, complete the transaction, which
- # links the object to root.
- root.obj2 = obj2
- rootsn = storage.getSerial(ZERO)
- data, refs = zodb_pickle(root)
- rootsn = storage.store(ZERO, rootsn, data, refs, '', t)
- storage.tpcVote(t)
- storage.tpcFinish(t)
- # And notify the pack thread that it can do the sweep and collect
- self._cv.notify()
- self._cv.wait()
- # We're done with the condvar and the thread
- self._cv.release()
- packthread.join()
- # Now make sure that all the interesting objects are still available
- rootsn = storage.getSerial(ZERO)
- obj1sn = storage.getSerial('\0'*7 + '\1')
- obj2sn = storage.getSerial('\0'*7 + '\2')
- # obj1 revision was written before the second revision of the root
- unless(obj1sn < rootsn)
- unless(rootsn == obj2sn)
- unless(obj1sn < obj2sn)
-
-
-
-# Subclass which does ugly things to _dopack so we can actually test the race
-# condition. We need to store a new object in the database between the
-# _mark() call and the _sweep() call.
-class SynchronizedFullStorage(BDBFullStorage):
- # XXX Cut and paste copy from BDBFullStorage, except where indicated
- def _dopack(self, t, gc=True):
- # t is a TimeTime, or time float, convert this to a TimeStamp object,
- # using an algorithm similar to what's used in FileStorage. We know
- # that our transaction ids, a.k.a. revision ids, are timestamps.
- #
- # BAW: should a pack time in the future be a ValueError? We'd have to
- # worry about clock skew, so for now, we just set the pack time to the
- # minimum of t and now.
- packtime = min(t, time.time())
- t0 = TimeStamp(*(time.gmtime(packtime)[:5] + (packtime % 60,)))
- packtid = `t0`
- # Collect all revisions of all objects earlier than the pack time.
- self._lock_acquire()
- try:
- self._withtxn(self._collect_revs, packtid)
- finally:
- self._lock_release()
- # Collect any objects with refcount zero.
- self._lock_acquire()
- try:
- self._withtxn(self._collect_objs)
- finally:
- self._lock_release()
- # If we're not doing a classic pack, we're done.
- if not gc:
- return
- # Do a mark and sweep for garbage collection. Calculate the set of
- # objects reachable from the root. Anything else is a candidate for
- # having all their revisions packed away. The set of reachable
- # objects lives in the _packmark table.
- self._lock_acquire()
- try:
- self._withtxn(self._mark, packtid)
- finally:
- self._lock_release()
- # XXX thread coordination code start
- self.cv.acquire()
- self.cv.notify()
- self.cv.wait()
- # XXX thread coordination code stop
- #
- # Now perform a sweep, using oidqueue to hold all object ids for
- # objects which are not root reachable as of the pack time.
- self._lock_acquire()
- try:
- self._withtxn(self._sweep, packtid)
- finally:
- self._lock_release()
- # Once again, collect any objects with refcount zero due to the mark
- # and sweep garbage collection pass.
- self._lock_acquire()
- try:
- self._withtxn(self._collect_objs)
- finally:
- self._lock_release()
- # XXX thread coordination code start
- self.cv.notify()
- self.cv.release()
- # XXX thread coordination code stop
-
-
-class FullPackThread(threading.Thread):
- def __init__(self, storage):
- threading.Thread.__init__(self)
- self._storage = storage
-
- def run(self):
- self._storage.pack(time.time(), gc=True)
-
-
-class TestFullClassicPackRaceCondition(RaceConditionBase):
- ConcreteStorage = SynchronizedFullStorage
-
- def _getPackThread(self, storage):
- return FullPackThread(storage)
-
-
-
-# Subclass which does ugly things to _dopack so we can actually test the race
-# condition. We need to storage a new object in the database between the
-# _mark() call and the _sweep() call.
-class SynchronizedMinimalStorage(BDBMinimalStorage):
- # XXX Cut and paste copy from BDBMinimalStorage, except where indicated
- def _dopack(self):
- # Do a mark and sweep for garbage collection. Calculate the set of
- # objects reachable from the root. Anything else is a candidate for
- # having all their revisions packed away. The set of reachable
- # objects lives in the _packmark table.
- self._lock_acquire()
- try:
- self._withtxn(self._mark)
- finally:
- self._lock_release()
- # XXX thread coordination code start
- self.cv.acquire()
- self.cv.notify()
- self.cv.wait()
- # XXX thread coordination code stop
- #
- # Now perform a sweep, using oidqueue to hold all object ids for
- # objects which are not root reachable as of the pack time.
- self._lock_acquire()
- try:
- self._withtxn(self._sweep)
- finally:
- self._lock_release()
- # Once again, collect any objects with refcount zero due to the mark
- # and sweep garbage collection pass.
- self._lock_acquire()
- try:
- self._withtxn(self._collect_objs)
- finally:
- self._lock_release()
- # XXX thread coordination code start
- self.cv.notify()
- self.cv.release()
- # XXX thread coordination code stop
-
-
-class MinimalPackThread(threading.Thread):
- def __init__(self, storage):
- threading.Thread.__init__(self)
- self._storage = storage
-
- def run(self):
- self._storage.pack(time.time())
-
-
-class TestMinimalClassicPackRaceCondition(RaceConditionBase):
- ConcreteStorage = SynchronizedMinimalStorage
-
- def _getPackThread(self, storage):
- return MinimalPackThread(storage)
-
-
-
def test_suite():
suite = unittest.TestSuite()
suite.level = 2
@@ -523,13 +262,6 @@
suite.addTest(unittest.makeSuite(TestAutopack))
suite.addTest(unittest.makeSuite(TestAutomaticClassicPack))
suite.addTest(unittest.makeSuite(TestMinimalPack))
- suite.addTest(unittest.makeSuite(TestFullClassicPackRaceCondition))
- # These tests cannot work for minimal storage with the new locking
- # regime. The test purposefully does not complete the in-progress
- # transaction until pack reaches the mark phase, which it can't do
- # because the outer pack() call tries to acquire the commit lock,
- # which /it/ can't get.
- #suite.addTest(unittest.makeSuite(TestMinimalClassicPackRaceCondition))
return suite