[Zope-Checkins] CVS: ZODB3/ZEO/tests -
InvalidationTests.py:1.1.4.6.4.1
Jeremy Hylton
jeremy at zope.com
Wed Aug 20 17:38:20 EDT 2003
Update of /cvs-repository/ZODB3/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv23383/ZEO/tests
Modified Files:
Tag: ZODB3-vote-backoff-branch
InvalidationTests.py
Log Message:
Add a new concurrent updates test the modifies a large BTree.
The test is specifically designed to have competing updates large
enough to trigger the "run-vote-in-separate-thread" code in
StorageServer.py. I confirmed that it exercises that code by adding
some debugging prints.
=== ZODB3/ZEO/tests/InvalidationTests.py 1.1.4.6 => 1.1.4.6.4.1 ===
--- ZODB3/ZEO/tests/InvalidationTests.py:1.1.4.6 Thu Jun 12 21:33:52 2003
+++ ZODB3/ZEO/tests/InvalidationTests.py Wed Aug 20 16:38:19 2003
@@ -85,6 +85,73 @@
key += self.step
cn.close()
+class LargeUpdatesThread(TestThread):
+
+ # A thread that performs a lot of updates. It attempts to modify
+ # more than 25 objects so that it can test code that runs vote
+ # in a separate thread when it modifies more than 25 objects.
+
+ keys = range(3000)
+
+ def __init__(self, testcase, db, stop, threadnum, startnum,
+ step=2, sleep=None):
+ TestThread.__init__(self, testcase)
+ self.db = db
+ self.stop = stop
+ self.threadnum = threadnum
+ self.startnum = startnum
+ self.step = step
+ self.sleep = sleep
+ self.added_keys = []
+
+ def testrun(self):
+ cn = self.db.open()
+ while not self.stop.isSet():
+ try:
+ tree = cn.root()["tree"]
+ break
+ except (ConflictError, KeyError):
+ get_transaction().abort()
+ cn.sync()
+
+ tkeys = range(self.threadnum, 3000)
+ while not self.stop.isSet():
+
+ # The test picks 50 keys spread across many buckets. The
+ # three threads start with different offsets to minimize
+ # conflict errors.
+
+ nkeys = len(tkeys)
+ if nkeys < 50:
+ tkeys = range(self.threadnum, 3000)
+ nkeys = len(tkeys)
+ step = max(nkeys / 50, 1)
+ keys = [tkeys[i] for i in range(0, nkeys - 1, step)]
+ for key in keys:
+ try:
+ tree[key] = self.threadnum
+ except (ReadConflictError, ConflictError), msg:
+ cn.sync()
+ break
+ else:
+ get_transaction().note("keys %s" % ", ".join(map(str, keys)))
+ try:
+ get_transaction().commit()
+ if self.sleep:
+ time.sleep(self.sleep)
+ except ConflictError, msg:
+ get_transaction().abort()
+ continue
+ for k in keys:
+ tkeys.remove(k)
+ self.added_keys += keys
+ # sync() is necessary here to process invalidations
+ # if we get a read conflict. In the read conflict case,
+ # no objects were modified so cn never got registered
+ # with the transaction.
+ cn.sync()
+ cn.close()
+
class VersionStressThread(TestThread):
def __init__(self, testcase, db, stop, threadnum, startnum,
@@ -162,6 +229,7 @@
level = 2
DELAY = 15 # number of seconds the main thread lets the workers run
+ StressThread = StressThread
def _check_tree(self, cn, tree):
# Make sure the BTree is sane and that all the updates persisted.
@@ -215,8 +283,8 @@
get_transaction().commit()
# Run two threads that update the BTree
- t1 = StressThread(self, db1, stop, 1, 1)
- t2 = StressThread(self, db2, stop, 2, 2)
+ t1 = self.StressThread(self, db1, stop, 1, 1)
+ t2 = self.StressThread(self, db2, stop, 2, 2)
self.go(stop, t1, t2)
cn.sync()
@@ -237,8 +305,8 @@
get_transaction().commit()
# Run two threads that update the BTree
- t1 = StressThread(self, db1, stop, 1, 1, sleep=0.001)
- t2 = StressThread(self, db1, stop, 2, 2, sleep=0.001)
+ t1 = self.StressThread(self, db1, stop, 1, 1, sleep=0.001)
+ t2 = self.StressThread(self, db1, stop, 2, 2, sleep=0.001)
self.go(stop, t1, t2)
cn.sync()
@@ -263,9 +331,9 @@
# is possible for both threads to read the same object
# at the same time.
- t1 = StressThread(self, db1, stop, 1, 1, 3)
- t2 = StressThread(self, db2, stop, 2, 2, 3, 0.001)
- t3 = StressThread(self, db2, stop, 3, 3, 3, 0.001)
+ t1 = self.StressThread(self, db1, stop, 1, 1, 3)
+ t2 = self.StressThread(self, db2, stop, 2, 2, 3, 0.001)
+ t3 = self.StressThread(self, db2, stop, 3, 3, 3, 0.001)
self.go(stop, t1, t2, t3)
cn.sync()
@@ -299,6 +367,46 @@
cn.sync()
self._check_tree(cn, tree)
self._check_threads(tree, t1, t2, t3)
+
+ cn.close()
+ db1.close()
+ db2.close()
+
+ def checkConcurrentLargeUpdates(self):
+ # Use 3 threads like the 2StorageMT test above.
+ self._storage = storage1 = self.openClientStorage()
+ db1 = DB(storage1)
+ db2 = DB(self.openClientStorage())
+ stop = threading.Event()
+
+ cn = db1.open()
+ tree = cn.root()["tree"] = OOBTree()
+ for i in range(0, 3000, 2):
+ tree[i] = 0
+ get_transaction().commit()
+
+ # Run three threads that update the BTree.
+ # Two of the threads share a single storage so that it
+ # is possible for both threads to read the same object
+ # at the same time.
+
+ t1 = LargeUpdatesThread(self, db1, stop, 1, 1, 3, 0.001)
+ t2 = LargeUpdatesThread(self, db2, stop, 2, 2, 3, 0.001)
+ t3 = LargeUpdatesThread(self, db2, stop, 3, 3, 3, 0.001)
+ self.go(stop, t1, t2, t3)
+
+ cn.sync()
+ self._check_tree(cn, tree)
+
+ # check the threads differently here than in the other tests
+ L = [None, t1, t2, t3]
+ for t in t1, t2, t3:
+ self.assert_(t.added_keys)
+ for k in t.added_keys:
+ self.assert_(tree[k] != 0)
+ if tree[k] != t.threadnum:
+ self.assert_(k in L[tree[k]].added_keys)
+
cn.close()
db1.close()
More information about the Zope-Checkins
mailing list