[Zodb-checkins] CVS: ZODB3/ZEO/tests - InvalidationTests.py:1.4.2.2
Jeremy Hylton
jeremy at zope.com
Fri Sep 5 17:21:02 EDT 2003
Update of /cvs-repository/ZODB3/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv1078/ZEO/tests
Modified Files:
Tag: ZODB3-3_2-branch
InvalidationTests.py
Log Message:
Port the modern, adaptive concurrent update tests from ZODB3-3_1-branch.
=== ZODB3/ZEO/tests/InvalidationTests.py 1.4.2.1 => 1.4.2.2 ===
--- ZODB3/ZEO/tests/InvalidationTests.py:1.4.2.1 Thu Jun 26 11:38:38 2003
+++ ZODB3/ZEO/tests/InvalidationTests.py Fri Sep 5 16:21:00 2003
@@ -12,7 +12,6 @@
#
##############################################################################
-from thread import get_ident
import threading
import time
@@ -20,12 +19,10 @@
from BTrees.OOBTree import OOBTree
from ZEO.tests.TestThread import TestThread
-from ZEO.tests.ConnectionTests import CommonSetupTearDown
from ZODB.DB import DB
from ZODB.POSException \
import ReadConflictError, ConflictError, VersionLockError
-import zLOG
# The tests here let several threads have a go at one or more database
# instances simultaneously. Each thread appends a disjoint (from the
@@ -48,8 +45,8 @@
# to 'tree' until Event stop is set. If sleep is given, sleep
# that long after each append. At the end, instance var .added_keys
# is a list of the ints the thread believes it added successfully.
- def __init__(self, testcase, db, stop, threadnum, startnum,
- step=2, sleep=None):
+ def __init__(self, testcase, db, stop, threadnum, commitdict,
+ startnum, step=2, sleep=None):
TestThread.__init__(self, testcase)
self.db = db
self.stop = stop
@@ -58,6 +55,7 @@
self.step = step
self.sleep = sleep
self.added_keys = []
+ self.commitdict = commitdict
def testrun(self):
cn = self.db.open()
@@ -74,6 +72,7 @@
tree[key] = self.threadnum
get_transaction().note("add key %s" % key)
get_transaction().commit()
+ self.commitdict[self] = 1
if self.sleep:
time.sleep(self.sleep)
except (ReadConflictError, ConflictError), msg:
@@ -88,9 +87,13 @@
key += self.step
cn.close()
-class VersionStressThread(TestThread):
+class LargeUpdatesThread(TestThread):
+
+ # A thread that performs a lot of updates. It attempts to modify
+ # more than 25 objects so that it can test code that runs vote
+ # in a separate thread when it modifies more than 25 objects.
- def __init__(self, testcase, db, stop, threadnum, startnum,
+ def __init__(self, testcase, db, stop, threadnum, commitdict, startnum,
step=2, sleep=None):
TestThread.__init__(self, testcase)
self.db = db
@@ -100,21 +103,88 @@
self.step = step
self.sleep = sleep
self.added_keys = []
+ self.commitdict = commitdict
- def log(self, msg):
- zLOG.LOG("thread %d" % get_ident(), 0, msg)
+ def testrun(self):
+ cn = self.db.open()
+ while not self.stop.isSet():
+ try:
+ tree = cn.root()["tree"]
+ break
+ except (ConflictError, KeyError):
+ # print "%d getting tree abort" % self.threadnum
+ get_transaction().abort()
+ cn.sync()
+
+ keys_added = {} # set of keys we commit
+ tkeys = []
+ while not self.stop.isSet():
+
+ # The test picks 50 keys spread across many buckets.
+ # self.startnum and self.step ensure that all threads use
+ # disjoint key sets, to minimize conflict errors.
+
+ nkeys = len(tkeys)
+ if nkeys < 50:
+ tkeys = range(self.startnum, 3000, self.step)
+ nkeys = len(tkeys)
+ step = max(int(nkeys / 50), 1)
+ keys = [tkeys[i] for i in range(0, nkeys, step)]
+ for key in keys:
+ try:
+ tree[key] = self.threadnum
+ except (ReadConflictError, ConflictError), msg:
+ # print "%d setting key %s" % (self.threadnum, msg)
+ get_transaction().abort()
+ cn.sync()
+ break
+ else:
+ # print "%d set #%d" % (self.threadnum, len(keys))
+ get_transaction().note("keys %s" % ", ".join(map(str, keys)))
+ try:
+ get_transaction().commit()
+ self.commitdict[self] = 1
+ if self.sleep:
+ time.sleep(self.sleep)
+ except ConflictError, msg:
+ # print "%d commit %s" % (self.threadnum, msg)
+ get_transaction().abort()
+ cn.sync()
+ continue
+ for k in keys:
+ tkeys.remove(k)
+ keys_added[k] = 1
+ # sync() is necessary here to process invalidations
+ # if we get a read conflict. In the read conflict case,
+ # no objects were modified so cn never got registered
+ # with the transaction.
+ cn.sync()
+ self.added_keys = keys_added.keys()
+ cn.close()
+
+class VersionStressThread(TestThread):
+
+ def __init__(self, testcase, db, stop, threadnum, commitdict, startnum,
+ step=2, sleep=None):
+ TestThread.__init__(self, testcase)
+ self.db = db
+ self.stop = stop
+ self.threadnum = threadnum
+ self.startnum = startnum
+ self.step = step
+ self.sleep = sleep
+ self.added_keys = []
+ self.commitdict = commitdict
def testrun(self):
- self.log("thread begin")
commit = 0
key = self.startnum
while not self.stop.isSet():
version = "%s:%s" % (self.threadnum, key)
commit = not commit
- self.log("attempt to add key=%s version=%s commit=%d" %
- (key, version, commit))
if self.oneupdate(version, key, commit):
self.added_keys.append(key)
+ self.commitdict[self] = 1
key += self.step
def oneupdate(self, version, key, commit=1):
@@ -134,11 +204,11 @@
while not self.stop.isSet():
try:
tree[key] = self.threadnum
- get_transaction().note("add key %d" % key)
get_transaction().commit()
+ if self.sleep:
+ time.sleep(self.sleep)
break
except (VersionLockError, ReadConflictError, ConflictError), msg:
- self.log(msg)
get_transaction().abort()
# sync() is necessary here to process invalidations
# if we get a read conflict. In the read conflict case,
@@ -161,20 +231,30 @@
time.sleep(self.sleep)
return commit
except ConflictError, msg:
- self.log(msg)
get_transaction().abort()
cn.sync()
finally:
cn.close()
return 0
-class InvalidationTests(CommonSetupTearDown):
+class InvalidationTests:
level = 2
- DELAY = 15 # number of seconds the main thread lets the workers run
+
+ # Minimum # of seconds the main thread lets the workers run. The
+ # test stops as soon as this much time has elapsed, and all threads
+ # have managed to commit a change.
+ MINTIME = 10
+
+ # Maximum # of seconds the main thread lets the workers run. We
+ # stop after this long has elapsed regardless of whether all threads
+ # have managed to commit a change.
+ MAXTIME = 300
+
+ StressThread = StressThread
def _check_tree(self, cn, tree):
- # Make sure the BTree is sane and that all the updates persisted.
+ # Make sure the BTree is sane at the C level.
retries = 3
while retries:
retries -= 1
@@ -194,28 +274,46 @@
def _check_threads(self, tree, *threads):
# Make sure the thread's view of the world is consistent with
# the actual database state.
- all_keys = []
+ expected_keys = []
+ errormsgs = []
+ err = errormsgs.append
for t in threads:
- # If the test didn't add any keys, it didn't do what we expected.
- self.assert_(t.added_keys)
- for key in t.added_keys:
- self.assert_(tree.has_key(key), key)
- all_keys.extend(t.added_keys)
- all_keys.sort()
- self.assertEqual(all_keys, list(tree.keys()))
+ if not t.added_keys:
+ err("thread %d didn't add any keys" % t.threadnum)
+ expected_keys.extend(t.added_keys)
+ expected_keys.sort()
+ actual_keys = list(tree.keys())
+ if expected_keys != actual_keys:
+ err("expected keys != actual keys")
+ for k in expected_keys:
+ if k not in actual_keys:
+ err("key %s expected but not in tree" % k)
+ for k in actual_keys:
+ if k not in expected_keys:
+ err("key %s in tree but not expected" % k)
+ if errormsgs:
+ display(tree)
+ self.fail('\n'.join(errormsgs))
- def go(self, stop, *threads):
+ def go(self, stop, commitdict, *threads):
# Run the threads
for t in threads:
t.start()
- time.sleep(self.DELAY)
+ delay = self.MINTIME
+ start = time.time()
+ while time.time() - start <= self.MAXTIME:
+ time.sleep(delay)
+ delay = 2.0
+ if len(commitdict) >= len(threads):
+ break
+ # Some thread still hasn't managed to commit anything.
stop.set()
for t in threads:
t.cleanup()
def checkConcurrentUpdates2Storages(self):
self._storage = storage1 = self.openClientStorage()
- storage2 = self.openClientStorage(cache="2")
+ storage2 = self.openClientStorage()
db1 = DB(storage1)
db2 = DB(storage2)
stop = threading.Event()
@@ -225,9 +323,10 @@
get_transaction().commit()
# Run two threads that update the BTree
- t1 = StressThread(self, db1, stop, 1, 1)
- t2 = StressThread(self, db2, stop, 2, 2)
- self.go(stop, t1, t2)
+ cd = {}
+ t1 = self.StressThread(self, db1, stop, 1, cd, 1)
+ t2 = self.StressThread(self, db2, stop, 2, cd, 2)
+ self.go(stop, cd, t1, t2)
cn.sync()
self._check_tree(cn, tree)
@@ -247,9 +346,10 @@
get_transaction().commit()
# Run two threads that update the BTree
- t1 = StressThread(self, db1, stop, 1, 1, sleep=0.001)
- t2 = StressThread(self, db1, stop, 2, 2, sleep=0.001)
- self.go(stop, t1, t2)
+ cd = {}
+ t1 = self.StressThread(self, db1, stop, 1, cd, 1, sleep=0.001)
+ t2 = self.StressThread(self, db1, stop, 2, cd, 2, sleep=0.001)
+ self.go(stop, cd, t1, t2)
cn.sync()
self._check_tree(cn, tree)
@@ -261,22 +361,23 @@
def checkConcurrentUpdates2StoragesMT(self):
self._storage = storage1 = self.openClientStorage()
db1 = DB(storage1)
+ db2 = DB(self.openClientStorage())
stop = threading.Event()
cn = db1.open()
tree = cn.root()["tree"] = OOBTree()
get_transaction().commit()
- db2 = DB(self.openClientStorage(cache="2"))
# Run three threads that update the BTree.
# Two of the threads share a single storage so that it
# is possible for both threads to read the same object
# at the same time.
- t1 = StressThread(self, db1, stop, 1, 1, 3)
- t2 = StressThread(self, db2, stop, 2, 2, 3, 0.001)
- t3 = StressThread(self, db2, stop, 3, 3, 3, 0.001)
- self.go(stop, t1, t2, t3)
+ cd = {}
+ t1 = self.StressThread(self, db1, stop, 1, cd, 1, 3)
+ t2 = self.StressThread(self, db2, stop, 2, cd, 2, 3, 0.001)
+ t3 = self.StressThread(self, db2, stop, 3, cd, 3, 3, 0.001)
+ self.go(stop, cd, t1, t2, t3)
cn.sync()
self._check_tree(cn, tree)
@@ -289,7 +390,7 @@
def checkConcurrentUpdatesInVersions(self):
self._storage = storage1 = self.openClientStorage()
db1 = DB(storage1)
- db2 = DB(self.openClientStorage(cache="2"))
+ db2 = DB(self.openClientStorage())
stop = threading.Event()
cn = db1.open()
@@ -301,10 +402,11 @@
# is possible for both threads to read the same object
# at the same time.
- t1 = VersionStressThread(self, db1, stop, 1, 1, 3)
- t2 = VersionStressThread(self, db2, stop, 2, 2, 3, 0.001)
- t3 = VersionStressThread(self, db2, stop, 3, 3, 3, 0.001)
- self.go(stop, t1, t2, t3)
+ cd = {}
+ t1 = VersionStressThread(self, db1, stop, 1, cd, 1, 3)
+ t2 = VersionStressThread(self, db2, stop, 2, cd, 2, 3, 0.001)
+ t3 = VersionStressThread(self, db2, stop, 3, cd, 3, 3, 0.001)
+ self.go(stop, cd, t1, t2, t3)
cn.sync()
self._check_tree(cn, tree)
@@ -314,3 +416,41 @@
db1.close()
db2.close()
+ def checkConcurrentLargeUpdates(self):
+ # Use 3 threads like the 2StorageMT test above.
+ self._storage = storage1 = self.openClientStorage()
+ db1 = DB(storage1)
+ db2 = DB(self.openClientStorage())
+ stop = threading.Event()
+
+ cn = db1.open()
+ tree = cn.root()["tree"] = OOBTree()
+ for i in range(0, 3000, 2):
+ tree[i] = 0
+ get_transaction().commit()
+
+ # Run three threads that update the BTree.
+ # Two of the threads share a single storage so that it
+ # is possible for both threads to read the same object
+ # at the same time.
+
+ cd = {}
+ t1 = LargeUpdatesThread(self, db1, stop, 1, cd, 1, 3, 0.001)
+ t2 = LargeUpdatesThread(self, db2, stop, 2, cd, 2, 3, 0.001)
+ t3 = LargeUpdatesThread(self, db2, stop, 3, cd, 3, 3, 0.001)
+ self.go(stop, cd, t1, t2, t3)
+
+ cn.sync()
+ self._check_tree(cn, tree)
+
+ # Purge the tree of the dummy entries mapping to 0.
+ losers = [k for k, v in tree.items() if v == 0]
+ for k in losers:
+ del tree[k]
+ get_transaction().commit()
+
+ self._check_threads(tree, t1, t2, t3)
+
+ cn.close()
+ db1.close()
+ db2.close()
More information about the Zodb-checkins
mailing list