[Zope-Checkins] CVS: ZODB3/ZEO/tests - InvalidationTests.py:1.1.2.2
Jeremy Hylton
jeremy@zope.com
Wed, 11 Jun 2003 18:45:08 -0400
Update of /cvs-repository/ZODB3/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv2929
Modified Files:
Tag: tim-loading_oids_status-branch
InvalidationTests.py
Log Message:
Add an invalidation stress test that involves versions.
Bump the delay so that the version test sometimes gets a bucket split.
=== ZODB3/ZEO/tests/InvalidationTests.py 1.1.2.1 => 1.1.2.2 ===
--- ZODB3/ZEO/tests/InvalidationTests.py:1.1.2.1 Wed Jun 11 17:35:10 2003
+++ ZODB3/ZEO/tests/InvalidationTests.py Wed Jun 11 18:45:08 2003
@@ -21,7 +21,8 @@
from ZEO.tests.TestThread import TestThread
from ZODB.DB import DB
-from ZODB.POSException import ReadConflictError, ConflictError
+from ZODB.POSException \
+ import ReadConflictError, ConflictError, VersionLockError
class StressThread(TestThread):
@@ -64,10 +65,83 @@
key += self.step
cn.close()
+class VersionStressThread(TestThread):
+
+ def __init__(self, testcase, db, stop, threadnum, startnum,
+ step=2, sleep=None):
+ TestThread.__init__(self, testcase)
+ self.db = db
+ self.stop = stop
+ self.threadnum = threadnum
+ self.startnum = startnum
+ self.step = step
+ self.sleep = sleep
+ self.added_keys = []
+
+ def testrun(self):
+ commit = 0
+ key = self.startnum
+ while not self.stop.isSet():
+ version = "%s:%s" % (self.threadnum, key)
+ commit = not commit
+ if self.oneupdate(version, key, commit):
+ self.added_keys.append(key)
+ key += self.step
+
+ def oneupdate(self, version, key, commit=1):
+ # The mess of sleeps below were added to reduce the number
+ # of VersionLockErrors, based on empirical observation.
+ # It looks like the threads don't switch enough without
+ # the sleeps.
+
+ cn = self.db.open(version)
+ while not self.stop.isSet():
+ try:
+ tree = cn.root()["tree"]
+ break
+ except (ConflictError, KeyError):
+ get_transaction().abort()
+ cn.sync()
+ while not self.stop.isSet():
+ try:
+ tree[key] = self.threadnum
+ get_transaction().commit()
+ if self.sleep:
+ time.sleep(self.sleep)
+ break
+ except (VersionLockError, ReadConflictError, ConflictError), msg:
+ get_transaction().abort()
+ # sync() is necessary here to process invalidations
+ # if we get a read conflict. In the read conflict case,
+ # no objects were modified so cn never got registered
+ # with the transaction.
+ cn.sync()
+ if self.sleep:
+ time.sleep(self.sleep)
+ try:
+ while not self.stop.isSet():
+ try:
+ if commit:
+ self.db.commitVersion(version)
+ get_transaction().note("commit version %s" % version)
+ else:
+ self.db.abortVersion(version)
+ get_transaction().note("abort version %s" % version)
+ get_transaction().commit()
+ if self.sleep:
+ time.sleep(self.sleep)
+ return commit
+ except ConflictError, msg:
+ get_transaction().abort()
+ cn.sync()
+ finally:
+ cn.close()
+ return 0
+
class InvalidationTests:
level = 2
- DELAY = 10
+ DELAY = 15
def _check_tree(self, cn, tree):
# Make sure the BTree is sane and that all the updates persisted
@@ -177,3 +251,32 @@
cn.close()
db1.close()
db2.close()
+
+ def checkConcurrentUpdatesInVersions(self):
+ self._storage = storage1 = self.openClientStorage()
+ db1 = DB(storage1)
+ db2 = DB(self.openClientStorage())
+ stop = threading.Event()
+
+ cn = db1.open()
+ tree = cn.root()["tree"] = OOBTree()
+ get_transaction().commit()
+
+ # Run three threads that update the BTree.
+ # Two of the threads share a single storage so that it
+ # is possible for both threads to read the same object
+ # at the same time.
+
+ t1 = VersionStressThread(self, db1, stop, 1, 1, 3)
+ t2 = VersionStressThread(self, db2, stop, 2, 2, 3, 0.001)
+ t3 = VersionStressThread(self, db2, stop, 3, 3, 3, 0.001)
+ self.go(stop, t1, t2, t3)
+
+ cn.sync()
+ self._check_tree(cn, tree)
+ self._check_threads(tree, t1, t2, t3)
+
+ cn.close()
+ db1.close()
+ db2.close()
+