[Zope-Checkins] CVS: ZODB3/ZEO/tests - zeoserver.py:1.19.6.1
testZEO.py:1.72.6.1 testConnection.py:1.16.4.1
testClientCache.py:1.9.4.1 InvalidationTests.py:1.4.4.1
ConnectionTests.py:1.40.4.1 CommitLockTests.py:1.12.18.2
Jeremy Hylton
jeremy at zope.com
Mon Sep 15 14:03:30 EDT 2003
Update of /cvs-repository/ZODB3/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv13599/ZEO/tests
Modified Files:
Tag: Zope-2_7-branch
zeoserver.py testZEO.py testConnection.py testClientCache.py
InvalidationTests.py ConnectionTests.py CommitLockTests.py
Log Message:
Take two: Merge changes from ZODB3-3_2-branch to Zope-2_7-branch.
Please make all future changes on the Zope-2_7-branch instead.
The previous attempt used "cvs up -j ZODB3-3_2-branch", but appeared
to get only a small fraction of the changes. This attempt is based on
copying a checkout of ZODB3-3_2-branch over top of a checkout of
Zope-2_7-branch.
=== ZODB3/ZEO/tests/zeoserver.py 1.19 => 1.19.6.1 ===
--- ZODB3/ZEO/tests/zeoserver.py:1.19 Thu Jun 5 18:39:01 2003
+++ ZODB3/ZEO/tests/zeoserver.py Mon Sep 15 14:02:59 2003
@@ -122,9 +122,9 @@
self._adminaddr = addr
def run(self):
- # If this process doesn't exit in 100 seconds, commit suicide
- for i in range(20):
- time.sleep(5)
+ # If this process doesn't exit in 300 seconds, commit suicide
+ time.sleep(300)
+ log("zeoserver", "suicide thread invoking shutdown")
from ZEO.tests.forker import shutdown_zeo_server
# XXX If the -k option was given to zeoserver, then the process will
# go away but the temp files won't get cleaned up.
@@ -174,7 +174,7 @@
transaction_timeout=zo.transaction_timeout,
monitor_address=mon_addr,
auth_protocol=zo.auth_protocol,
- auth_filename=zo.auth_database,
+ auth_database=zo.auth_database,
auth_realm=zo.auth_realm)
try:
=== ZODB3/ZEO/tests/testZEO.py 1.72 => 1.72.6.1 ===
--- ZODB3/ZEO/tests/testZEO.py:1.72 Fri May 30 15:20:56 2003
+++ ZODB3/ZEO/tests/testZEO.py Mon Sep 15 14:02:59 2003
@@ -162,6 +162,10 @@
if hasattr(ZODB, "__version__"):
ReadOnlyStorage.ReadOnlyStorage.checkWriteMethods(self)
+ def checkSortKey(self):
+ key = '%s:%s' % (self._storage._storage, self._storage._server_addr)
+ self.assertEqual(self._storage.sortKey(), key)
+
class FileStorageTests(GenericTests):
"""Test ZEO backed by a FileStorage."""
@@ -183,7 +187,7 @@
self._envdir = tempfile.mktemp()
return """\
<fullstorage 1>
- name %s
+ envdir %s
</fullstorage>
""" % self._envdir
=== ZODB3/ZEO/tests/testConnection.py 1.16 => 1.16.4.1 ===
--- ZODB3/ZEO/tests/testConnection.py:1.16 Fri Jun 13 18:27:33 2003
+++ ZODB3/ZEO/tests/testConnection.py Mon Sep 15 14:02:59 2003
@@ -38,7 +38,7 @@
def getConfig(self, path, create, read_only):
return """\
<fullstorage 1>
- name %s
+ envdir %s
read-only %s
</fullstorage>""" % (path, read_only and "yes" or "no")
@@ -57,19 +57,25 @@
class FileStorageReconnectionTests(
FileStorageConfig,
- ConnectionTests.ReconnectionTests
+ ConnectionTests.ReconnectionTests,
):
"""FileStorage-specific re-connection tests."""
# Run this at level 1 because MappingStorage can't do reconnection tests
level = 1
+class FileStorageInvqTests(
+ FileStorageConfig,
+ ConnectionTests.InvqTests
+ ):
+ """FileStorage-specific invalidation queue tests."""
+ level = 1
+
class FileStorageTimeoutTests(
FileStorageConfig,
ConnectionTests.TimeoutTests
):
level = 2
-
class BDBConnectionTests(
BerkeleyStorageConfig,
ConnectionTests.ConnectionTests,
@@ -85,6 +91,13 @@
"""Berkeley storage re-connection tests."""
level = 2
+class BDBInvqTests(
+ BerkeleyStorageConfig,
+ ConnectionTests.InvqTests
+ ):
+ """Berkeley storage invalidation queue tests."""
+ level = 2
+
class BDBTimeoutTests(
BerkeleyStorageConfig,
ConnectionTests.TimeoutTests
@@ -112,22 +125,19 @@
test_classes = [FileStorageConnectionTests,
FileStorageReconnectionTests,
+ FileStorageInvqTests,
FileStorageTimeoutTests,
MappingStorageConnectionTests,
MappingStorageTimeoutTests]
import BDBStorage
if BDBStorage.is_available:
- test_classes.append(BDBConnectionTests)
- test_classes.append(BDBReconnectionTests)
- test_classes.append(BDBTimeoutTests)
-
+ test_classes += [BDBConnectionTests,
+ BDBReconnectionTests,
+ BDBInvqTests,
+ BDBTimeoutTests]
def test_suite():
- # shutup warnings about mktemp
- import warnings
- warnings.filterwarnings("ignore", "mktemp")
-
suite = unittest.TestSuite()
for klass in test_classes:
sub = unittest.makeSuite(klass, 'check')
=== ZODB3/ZEO/tests/testClientCache.py 1.9 => 1.9.4.1 ===
--- ZODB3/ZEO/tests/testClientCache.py:1.9 Mon Jun 16 14:50:06 2003
+++ ZODB3/ZEO/tests/testClientCache.py Mon Sep 15 14:02:59 2003
@@ -32,14 +32,12 @@
_oid3 = 'cdefghij'
def setUp(self):
- unittest.TestCase.setUp(self)
self.cachesize = 10*1000*1000
self.cache = ClientCache(size=self.cachesize)
self.cache.open()
def tearDown(self):
self.cache.close()
- unittest.TestCase.tearDown(self)
def testOpenClose(self):
pass # All the work is done by setUp() / tearDown()
@@ -281,9 +279,10 @@
class PersistentClientCacheTests(unittest.TestCase):
_oid = 'abcdefgh'
+ _oid2 = 'bcdefghi'
+ _oid3 = 'cdefghij'
def setUp(self):
- unittest.TestCase.setUp(self)
self.vardir = os.getcwd() # Don't use /tmp, it's a security risk
self.cachesize = 10*1000*1000
self.storagename = 'foo'
@@ -319,7 +318,6 @@
os.unlink(filename)
except os.error:
pass
- unittest.TestCase.tearDown(self)
def testCacheFileSelection(self):
# A bug in __init__ read the wrong slice of the file to determine
@@ -388,7 +386,42 @@
cache.checkSize(10*self.cachesize) # Force a file flip
self.failUnless(cache.getLastTid() is None)
-
+ def testLoadNonversionWithVersionInFlippedCache(self):
+ # This test provokes an error seen once in an unrelated test.
+ # The object is stored in the old cache file with version data,
+ # a load for non-version data occurs. The attempt to copy the
+ # non-version data to the new file fails.
+ nvdata = "Mend your speech a little, lest it may mar your fortunes."
+ nvserial = "12345678"
+ version = "folio"
+ vdata = "Mend your speech a little, lest you may mar your fortunes."
+ vserial = "12346789"
+
+ self.cache.store(self._oid, nvdata, nvserial, version, vdata, vserial)
+ self.cache.checkSize(10 * self.cachesize) # force a cache flip
+
+ for i in 1, 2: # check the we can load before and after copying
+ for xversion, xdata, xserial in [("", nvdata, nvserial),
+ (version, vdata, vserial)]:
+ data, serial = self.cache.load(self._oid, xversion)
+ self.assertEqual(data, xdata)
+ self.assertEqual(serial, xserial)
+
+ # now cause two more cache flips and make sure the data is still there
+ self.cache.store(self._oid2, "", "", "foo", "bar", "23456789")
+ self.cache.checkSize(10 * self.cachesize) # force a cache flip
+ self.cache.load(self._oid, "")
+ self.cache.store(self._oid3, "bar", "34567890", "", "", "")
+ self.cache.checkSize(10 * self.cachesize) # force a cache flip
+ self.cache.load(self._oid, "")
+
+ for i in 1, 2: # check the we can load before and after copying
+ for xversion, xdata, xserial in [("", nvdata, nvserial),
+ (version, vdata, vserial)]:
+ data, serial = self.cache.load(self._oid, xversion)
+ self.assertEqual(data, xdata)
+ self.assertEqual(serial, xserial)
+
class ClientCacheLongOIDTests(ClientCacheTests):
_oid = 'abcdefghijklmnop' * 2
_oid2 = 'bcdefghijklmnopq' * 2
@@ -397,7 +430,8 @@
class PersistentClientCacheLongOIDTests(PersistentClientCacheTests):
_oid = 'abcdefghijklmnop' * 2
-
+ _oid2 = 'bcdefghijklmnopq' * 2
+ _oid3 = 'cdefghijklmnopqr' * 2
def test_suite():
suite = unittest.TestSuite()
=== ZODB3/ZEO/tests/InvalidationTests.py 1.4 => 1.4.4.1 ===
--- ZODB3/ZEO/tests/InvalidationTests.py:1.4 Fri Jun 13 19:09:30 2003
+++ ZODB3/ZEO/tests/InvalidationTests.py Mon Sep 15 14:02:59 2003
@@ -12,7 +12,6 @@
#
##############################################################################
-from thread import get_ident
import threading
import time
@@ -20,12 +19,10 @@
from BTrees.OOBTree import OOBTree
from ZEO.tests.TestThread import TestThread
-from ZEO.tests.ConnectionTests import CommonSetupTearDown
from ZODB.DB import DB
from ZODB.POSException \
import ReadConflictError, ConflictError, VersionLockError
-import zLOG
# The tests here let several threads have a go at one or more database
# instances simultaneously. Each thread appends a disjoint (from the
@@ -48,8 +45,8 @@
# to 'tree' until Event stop is set. If sleep is given, sleep
# that long after each append. At the end, instance var .added_keys
# is a list of the ints the thread believes it added successfully.
- def __init__(self, testcase, db, stop, threadnum, startnum,
- step=2, sleep=None):
+ def __init__(self, testcase, db, stop, threadnum, commitdict,
+ startnum, step=2, sleep=None):
TestThread.__init__(self, testcase)
self.db = db
self.stop = stop
@@ -58,6 +55,7 @@
self.step = step
self.sleep = sleep
self.added_keys = []
+ self.commitdict = commitdict
def testrun(self):
cn = self.db.open()
@@ -74,6 +72,7 @@
tree[key] = self.threadnum
get_transaction().note("add key %s" % key)
get_transaction().commit()
+ self.commitdict[self] = 1
if self.sleep:
time.sleep(self.sleep)
except (ReadConflictError, ConflictError), msg:
@@ -88,9 +87,13 @@
key += self.step
cn.close()
-class VersionStressThread(TestThread):
+class LargeUpdatesThread(TestThread):
+
+ # A thread that performs a lot of updates. It attempts to modify
+ # more than 25 objects so that it can test code that runs vote
+ # in a separate thread when it modifies more than 25 objects.
- def __init__(self, testcase, db, stop, threadnum, startnum,
+ def __init__(self, testcase, db, stop, threadnum, commitdict, startnum,
step=2, sleep=None):
TestThread.__init__(self, testcase)
self.db = db
@@ -100,21 +103,88 @@
self.step = step
self.sleep = sleep
self.added_keys = []
+ self.commitdict = commitdict
+
+ def testrun(self):
+ cn = self.db.open()
+ while not self.stop.isSet():
+ try:
+ tree = cn.root()["tree"]
+ break
+ except (ConflictError, KeyError):
+ # print "%d getting tree abort" % self.threadnum
+ get_transaction().abort()
+ cn.sync()
+
+ keys_added = {} # set of keys we commit
+ tkeys = []
+ while not self.stop.isSet():
+
+ # The test picks 50 keys spread across many buckets.
+ # self.startnum and self.step ensure that all threads use
+ # disjoint key sets, to minimize conflict errors.
+
+ nkeys = len(tkeys)
+ if nkeys < 50:
+ tkeys = range(self.startnum, 3000, self.step)
+ nkeys = len(tkeys)
+ step = max(int(nkeys / 50), 1)
+ keys = [tkeys[i] for i in range(0, nkeys, step)]
+ for key in keys:
+ try:
+ tree[key] = self.threadnum
+ except (ReadConflictError, ConflictError), msg:
+ # print "%d setting key %s" % (self.threadnum, msg)
+ get_transaction().abort()
+ cn.sync()
+ break
+ else:
+ # print "%d set #%d" % (self.threadnum, len(keys))
+ get_transaction().note("keys %s" % ", ".join(map(str, keys)))
+ try:
+ get_transaction().commit()
+ self.commitdict[self] = 1
+ if self.sleep:
+ time.sleep(self.sleep)
+ except ConflictError, msg:
+ # print "%d commit %s" % (self.threadnum, msg)
+ get_transaction().abort()
+ cn.sync()
+ continue
+ for k in keys:
+ tkeys.remove(k)
+ keys_added[k] = 1
+ # sync() is necessary here to process invalidations
+ # if we get a read conflict. In the read conflict case,
+ # no objects were modified so cn never got registered
+ # with the transaction.
+ cn.sync()
+ self.added_keys = keys_added.keys()
+ cn.close()
- def log(self, msg):
- zLOG.LOG("thread %d" % get_ident(), 0, msg)
+class VersionStressThread(TestThread):
+
+ def __init__(self, testcase, db, stop, threadnum, commitdict, startnum,
+ step=2, sleep=None):
+ TestThread.__init__(self, testcase)
+ self.db = db
+ self.stop = stop
+ self.threadnum = threadnum
+ self.startnum = startnum
+ self.step = step
+ self.sleep = sleep
+ self.added_keys = []
+ self.commitdict = commitdict
def testrun(self):
- self.log("thread begin")
commit = 0
key = self.startnum
while not self.stop.isSet():
version = "%s:%s" % (self.threadnum, key)
commit = not commit
- self.log("attempt to add key=%s version=%s commit=%d" %
- (key, version, commit))
if self.oneupdate(version, key, commit):
self.added_keys.append(key)
+ self.commitdict[self] = 1
key += self.step
def oneupdate(self, version, key, commit=1):
@@ -134,13 +204,11 @@
while not self.stop.isSet():
try:
tree[key] = self.threadnum
- get_transaction().note("add key %d" % key)
get_transaction().commit()
if self.sleep:
time.sleep(self.sleep)
break
except (VersionLockError, ReadConflictError, ConflictError), msg:
- self.log(msg)
get_transaction().abort()
# sync() is necessary here to process invalidations
# if we get a read conflict. In the read conflict case,
@@ -163,20 +231,30 @@
time.sleep(self.sleep)
return commit
except ConflictError, msg:
- self.log(msg)
get_transaction().abort()
cn.sync()
finally:
cn.close()
return 0
-class InvalidationTests(CommonSetupTearDown):
+class InvalidationTests:
level = 2
- DELAY = 15 # number of seconds the main thread lets the workers run
+
+ # Minimum # of seconds the main thread lets the workers run. The
+ # test stops as soon as this much time has elapsed, and all threads
+ # have managed to commit a change.
+ MINTIME = 10
+
+ # Maximum # of seconds the main thread lets the workers run. We
+ # stop after this long has elapsed regardless of whether all threads
+ # have managed to commit a change.
+ MAXTIME = 300
+
+ StressThread = StressThread
def _check_tree(self, cn, tree):
- # Make sure the BTree is sane and that all the updates persisted.
+ # Make sure the BTree is sane at the C level.
retries = 3
while retries:
retries -= 1
@@ -196,28 +274,46 @@
def _check_threads(self, tree, *threads):
# Make sure the thread's view of the world is consistent with
# the actual database state.
- all_keys = []
+ expected_keys = []
+ errormsgs = []
+ err = errormsgs.append
for t in threads:
- # If the test didn't add any keys, it didn't do what we expected.
- self.assert_(t.added_keys)
- for key in t.added_keys:
- self.assert_(tree.has_key(key), key)
- all_keys.extend(t.added_keys)
- all_keys.sort()
- self.assertEqual(all_keys, list(tree.keys()))
+ if not t.added_keys:
+ err("thread %d didn't add any keys" % t.threadnum)
+ expected_keys.extend(t.added_keys)
+ expected_keys.sort()
+ actual_keys = list(tree.keys())
+ if expected_keys != actual_keys:
+ err("expected keys != actual keys")
+ for k in expected_keys:
+ if k not in actual_keys:
+ err("key %s expected but not in tree" % k)
+ for k in actual_keys:
+ if k not in expected_keys:
+ err("key %s in tree but not expected" % k)
+ if errormsgs:
+ display(tree)
+ self.fail('\n'.join(errormsgs))
- def go(self, stop, *threads):
+ def go(self, stop, commitdict, *threads):
# Run the threads
for t in threads:
t.start()
- time.sleep(self.DELAY)
+ delay = self.MINTIME
+ start = time.time()
+ while time.time() - start <= self.MAXTIME:
+ time.sleep(delay)
+ delay = 2.0
+ if len(commitdict) >= len(threads):
+ break
+ # Some thread still hasn't managed to commit anything.
stop.set()
for t in threads:
t.cleanup()
def checkConcurrentUpdates2Storages(self):
self._storage = storage1 = self.openClientStorage()
- storage2 = self.openClientStorage(cache="2")
+ storage2 = self.openClientStorage()
db1 = DB(storage1)
db2 = DB(storage2)
stop = threading.Event()
@@ -227,9 +323,10 @@
get_transaction().commit()
# Run two threads that update the BTree
- t1 = StressThread(self, db1, stop, 1, 1)
- t2 = StressThread(self, db2, stop, 2, 2)
- self.go(stop, t1, t2)
+ cd = {}
+ t1 = self.StressThread(self, db1, stop, 1, cd, 1)
+ t2 = self.StressThread(self, db2, stop, 2, cd, 2)
+ self.go(stop, cd, t1, t2)
cn.sync()
self._check_tree(cn, tree)
@@ -249,9 +346,10 @@
get_transaction().commit()
# Run two threads that update the BTree
- t1 = StressThread(self, db1, stop, 1, 1, sleep=0.001)
- t2 = StressThread(self, db1, stop, 2, 2, sleep=0.001)
- self.go(stop, t1, t2)
+ cd = {}
+ t1 = self.StressThread(self, db1, stop, 1, cd, 1, sleep=0.001)
+ t2 = self.StressThread(self, db1, stop, 2, cd, 2, sleep=0.001)
+ self.go(stop, cd, t1, t2)
cn.sync()
self._check_tree(cn, tree)
@@ -263,22 +361,23 @@
def checkConcurrentUpdates2StoragesMT(self):
self._storage = storage1 = self.openClientStorage()
db1 = DB(storage1)
+ db2 = DB(self.openClientStorage())
stop = threading.Event()
cn = db1.open()
tree = cn.root()["tree"] = OOBTree()
get_transaction().commit()
- db2 = DB(self.openClientStorage(cache="2"))
# Run three threads that update the BTree.
# Two of the threads share a single storage so that it
# is possible for both threads to read the same object
# at the same time.
- t1 = StressThread(self, db1, stop, 1, 1, 3)
- t2 = StressThread(self, db2, stop, 2, 2, 3, 0.001)
- t3 = StressThread(self, db2, stop, 3, 3, 3, 0.001)
- self.go(stop, t1, t2, t3)
+ cd = {}
+ t1 = self.StressThread(self, db1, stop, 1, cd, 1, 3)
+ t2 = self.StressThread(self, db2, stop, 2, cd, 2, 3, 0.001)
+ t3 = self.StressThread(self, db2, stop, 3, cd, 3, 3, 0.001)
+ self.go(stop, cd, t1, t2, t3)
cn.sync()
self._check_tree(cn, tree)
@@ -291,7 +390,7 @@
def checkConcurrentUpdatesInVersions(self):
self._storage = storage1 = self.openClientStorage()
db1 = DB(storage1)
- db2 = DB(self.openClientStorage(cache="2"))
+ db2 = DB(self.openClientStorage())
stop = threading.Event()
cn = db1.open()
@@ -303,10 +402,11 @@
# is possible for both threads to read the same object
# at the same time.
- t1 = VersionStressThread(self, db1, stop, 1, 1, 3)
- t2 = VersionStressThread(self, db2, stop, 2, 2, 3, 0.001)
- t3 = VersionStressThread(self, db2, stop, 3, 3, 3, 0.001)
- self.go(stop, t1, t2, t3)
+ cd = {}
+ t1 = VersionStressThread(self, db1, stop, 1, cd, 1, 3)
+ t2 = VersionStressThread(self, db2, stop, 2, cd, 2, 3, 0.001)
+ t3 = VersionStressThread(self, db2, stop, 3, cd, 3, 3, 0.001)
+ self.go(stop, cd, t1, t2, t3)
cn.sync()
self._check_tree(cn, tree)
@@ -316,3 +416,41 @@
db1.close()
db2.close()
+ def checkConcurrentLargeUpdates(self):
+ # Use 3 threads like the 2StorageMT test above.
+ self._storage = storage1 = self.openClientStorage()
+ db1 = DB(storage1)
+ db2 = DB(self.openClientStorage())
+ stop = threading.Event()
+
+ cn = db1.open()
+ tree = cn.root()["tree"] = OOBTree()
+ for i in range(0, 3000, 2):
+ tree[i] = 0
+ get_transaction().commit()
+
+ # Run three threads that update the BTree.
+ # Two of the threads share a single storage so that it
+ # is possible for both threads to read the same object
+ # at the same time.
+
+ cd = {}
+ t1 = LargeUpdatesThread(self, db1, stop, 1, cd, 1, 3, 0.001)
+ t2 = LargeUpdatesThread(self, db2, stop, 2, cd, 2, 3, 0.001)
+ t3 = LargeUpdatesThread(self, db2, stop, 3, cd, 3, 3, 0.001)
+ self.go(stop, cd, t1, t2, t3)
+
+ cn.sync()
+ self._check_tree(cn, tree)
+
+ # Purge the tree of the dummy entries mapping to 0.
+ losers = [k for k, v in tree.items() if v == 0]
+ for k in losers:
+ del tree[k]
+ get_transaction().commit()
+
+ self._check_threads(tree, t1, t2, t3)
+
+ cn.close()
+ db1.close()
+ db2.close()
=== ZODB3/ZEO/tests/ConnectionTests.py 1.40 => 1.40.4.1 ===
--- ZODB3/ZEO/tests/ConnectionTests.py:1.40 Mon Jun 16 17:04:38 2003
+++ ZODB3/ZEO/tests/ConnectionTests.py Mon Sep 15 14:02:59 2003
@@ -29,15 +29,17 @@
from ZEO.ClientStorage import ClientStorage
from ZEO.Exceptions import ClientDisconnected
from ZEO.zrpc.marshal import Marshaller
+from ZEO.zrpc.error import DisconnectedError
from ZEO.tests import forker
from ZODB.DB import DB
from ZODB.Transaction import get_transaction, Transaction
-from ZODB.POSException import ReadOnlyError
+from ZODB.POSException import ReadOnlyError, ConflictError
from ZODB.tests.StorageTestBase import StorageTestBase
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle
from ZODB.tests.StorageTestBase import handle_all_serials, ZERO
+from ZODB.tests.StorageTestBase import handle_serials
class TestClientStorage(ClientStorage):
@@ -98,6 +100,8 @@
if getattr(self, '_storage', None) is not None:
self._storage.close()
if hasattr(self._storage, 'cleanup'):
+ zLOG.LOG("testZEO", zLOG.DEBUG, "cleanup storage %s" %
+ self._storage.__name__)
self._storage.cleanup()
for adminaddr in self._servers:
if adminaddr is not None:
@@ -139,9 +143,14 @@
def getConfig(self, path, create, read_only):
raise NotImplementedError
- def openClientStorage(self, cache='', cache_size=200000, wait=1,
+ cache_id = 1
+
+ def openClientStorage(self, cache=None, cache_size=200000, wait=1,
read_only=0, read_only_fallback=0,
username=None, password=None, realm=None):
+ if cache is None:
+ cache = str(self.__class__.cache_id)
+ self.__class__.cache_id += 1
self.caches.append(cache)
storage = TestClientStorage(self.addr,
client=cache,
@@ -564,6 +573,70 @@
db2.close()
db1.close()
+class InvqTests(CommonSetupTearDown):
+ invq = 2
+
+ def checkQuickVerificationWith2Clients(self):
+ perstorage = self.openClientStorage(cache="test")
+ self.assertEqual(perstorage.verify_result, "full verification")
+
+ self._storage = self.openClientStorage()
+ oid = self._storage.new_oid()
+ # When we create a new storage, it should always do a full
+ # verification
+ self.assertEqual(self._storage.verify_result, "full verification")
+ # do two storages of the object to make sure an invalidation
+ # message is generated
+ revid = self._dostore(oid)
+ revid = self._dostore(oid, revid)
+
+ perstorage.load(oid, '')
+ perstorage.close()
+
+ revid = self._dostore(oid, revid)
+
+ perstorage = self.openClientStorage(cache="test")
+ self.assertEqual(perstorage.verify_result, "quick verification")
+
+ self.assertEqual(perstorage.load(oid, ''),
+ self._storage.load(oid, ''))
+ perstorage.close()
+
+ def checkVerificationWith2ClientsInvqOverflow(self):
+ perstorage = self.openClientStorage(cache="test")
+ self.assertEqual(perstorage.verify_result, "full verification")
+
+ self._storage = self.openClientStorage()
+ oid = self._storage.new_oid()
+ # When we create a new storage, it should always do a full
+ # verification
+ self.assertEqual(self._storage.verify_result, "full verification")
+ # do two storages of the object to make sure an invalidation
+ # message is generated
+ revid = self._dostore(oid)
+ revid = self._dostore(oid, revid)
+
+ perstorage.load(oid, '')
+ perstorage.close()
+
+ # the test code sets invq bound to 2
+ for i in range(5):
+ revid = self._dostore(oid, revid)
+
+ perstorage = self.openClientStorage(cache="test")
+ self.assertEqual(perstorage.verify_result, "full verification")
+ t = time.time() + 30
+ while not perstorage.end_verify.isSet():
+ perstorage.sync()
+ if time.time() > t:
+ self.fail("timed out waiting for endVerify")
+
+ self.assertEqual(self._storage.load(oid, '')[1], revid)
+ self.assertEqual(perstorage.load(oid, ''),
+ self._storage.load(oid, ''))
+
+ perstorage.close()
+
class ReconnectionTests(CommonSetupTearDown):
# The setUp() starts a server automatically. In order for its
# state to persist, we set the class variable keep to 1. In
@@ -686,7 +759,7 @@
self._newAddr()
# Start a read-only server
- self.startServer(create=0, index=0, read_only=1)
+ self.startServer(create=0, index=0, read_only=1, keep=0)
# Start a client in fallback mode
self._storage = self.openClientStorage(read_only_fallback=1)
# Stores should fail here
@@ -754,69 +827,6 @@
perstorage.close()
self._storage.close()
- def checkQuickVerificationWith2Clients(self):
- perstorage = self.openClientStorage(cache="test")
- self.assertEqual(perstorage.verify_result, "full verification")
-
- self._storage = self.openClientStorage()
- oid = self._storage.new_oid()
- # When we create a new storage, it should always do a full
- # verification
- self.assertEqual(self._storage.verify_result, "full verification")
- # do two storages of the object to make sure an invalidation
- # message is generated
- revid = self._dostore(oid)
- revid = self._dostore(oid, revid)
-
- perstorage.load(oid, '')
- perstorage.close()
-
- revid = self._dostore(oid, revid)
-
- perstorage = self.openClientStorage(cache="test")
- self.assertEqual(perstorage.verify_result, "quick verification")
-
- self.assertEqual(perstorage.load(oid, ''),
- self._storage.load(oid, ''))
- perstorage.close()
-
-
-
- def checkVerificationWith2ClientsInvqOverflow(self):
- perstorage = self.openClientStorage(cache="test")
- self.assertEqual(perstorage.verify_result, "full verification")
-
- self._storage = self.openClientStorage()
- oid = self._storage.new_oid()
- # When we create a new storage, it should always do a full
- # verification
- self.assertEqual(self._storage.verify_result, "full verification")
- # do two storages of the object to make sure an invalidation
- # message is generated
- revid = self._dostore(oid)
- revid = self._dostore(oid, revid)
-
- perstorage.load(oid, '')
- perstorage.close()
-
- # the test code sets invq bound to 2
- for i in range(5):
- revid = self._dostore(oid, revid)
-
- perstorage = self.openClientStorage(cache="test")
- self.assertEqual(perstorage.verify_result, "full verification")
- t = time.time() + 30
- while not perstorage.end_verify.isSet():
- perstorage.sync()
- if time.time() > t:
- self.fail("timed out waiting for endVerify")
-
- self.assertEqual(self._storage.load(oid, '')[1], revid)
- self.assertEqual(perstorage.load(oid, ''),
- self._storage.load(oid, ''))
-
- perstorage.close()
-
class TimeoutTests(CommonSetupTearDown):
timeout = 1
@@ -843,6 +853,103 @@
storage.tpc_begin(txn)
storage.tpc_abort(txn)
storage.close()
+
+ def checkTimeoutAfterVote(self):
+ raises = self.assertRaises
+ unless = self.failUnless
+ self._storage = storage = self.openClientStorage()
+ # Assert that the zeo cache is empty
+ unless(not storage._cache._index)
+ # Create the object
+ oid = storage.new_oid()
+ obj = MinPO(7)
+ ZERO = '\0'*8
+ # Now do a store, sleeping before the finish so as to cause a timeout
+ t = Transaction()
+ storage.tpc_begin(t)
+ revid1 = storage.store(oid, ZERO, zodb_pickle(obj), '', t)
+ storage.tpc_vote(t)
+ # Now sleep long enough for the storage to time out
+ time.sleep(3)
+ storage.sync()
+ unless(not storage.is_connected())
+ storage._wait()
+ unless(storage.is_connected())
+ # We expect finish to fail
+ raises(ClientDisconnected, storage.tpc_finish, t)
+ # The cache should still be empty
+ unless(not storage._cache._index)
+ # Load should fail since the object should not be in either the cache
+ # or the server.
+ raises(KeyError, storage.load, oid, '')
+
+ def checkTimeoutProvokingConflicts(self):
+ eq = self.assertEqual
+ raises = self.assertRaises
+ unless = self.failUnless
+ self._storage = storage = self.openClientStorage()
+ # Assert that the zeo cache is empty
+ unless(not storage._cache._index)
+ # Create the object
+ oid = storage.new_oid()
+ obj = MinPO(7)
+ ZERO = '\0'*8
+ # We need to successfully commit an object now so we have something to
+ # conflict about.
+ t = Transaction()
+ storage.tpc_begin(t)
+ revid1a = storage.store(oid, ZERO, zodb_pickle(obj), '', t)
+ revid1b = storage.tpc_vote(t)
+ revid1 = handle_serials(oid, revid1a, revid1b)
+ storage.tpc_finish(t)
+ # Now do a store, sleeping before the finish so as to cause a timeout
+ obj.value = 8
+ t = Transaction()
+ storage.tpc_begin(t)
+ revid2a = storage.store(oid, revid1, zodb_pickle(obj), '', t)
+ revid2b = storage.tpc_vote(t)
+ revid2 = handle_serials(oid, revid2a, revid2b)
+ # Now sleep long enough for the storage to time out
+ time.sleep(3)
+ storage.sync()
+ unless(not storage.is_connected())
+ storage._wait()
+ unless(storage.is_connected())
+ # We expect finish to fail
+ raises(ClientDisconnected, storage.tpc_finish, t)
+ # Now we think we've committed the second transaction, but we really
+ # haven't. A third one should produce a POSKeyError on the server,
+ # which manifests as a ConflictError on the client.
+ obj.value = 9
+ t = Transaction()
+ storage.tpc_begin(t)
+ storage.store(oid, revid2, zodb_pickle(obj), '', t)
+ raises(ConflictError, storage.tpc_vote, t)
+ # Even aborting won't help
+ storage.tpc_abort(t)
+ storage.tpc_finish(t)
+ # Try again
+ obj.value = 10
+ t = Transaction()
+ storage.tpc_begin(t)
+ storage.store(oid, revid2, zodb_pickle(obj), '', t)
+ # Even aborting won't help
+ raises(ConflictError, storage.tpc_vote, t)
+ # Abort this one and try a transaction that should succeed
+ storage.tpc_abort(t)
+ storage.tpc_finish(t)
+ # Now do a store, sleeping before the finish so as to cause a timeout
+ obj.value = 11
+ t = Transaction()
+ storage.tpc_begin(t)
+ revid2a = storage.store(oid, revid1, zodb_pickle(obj), '', t)
+ revid2b = storage.tpc_vote(t)
+ revid2 = handle_serials(oid, revid2a, revid2b)
+ storage.tpc_finish(t)
+ # Now load the object and verify that it has a value of 11
+ data, revid = storage.load(oid, '')
+ eq(zodb_unpickle(data), MinPO(11))
+ eq(revid, revid2)
class MSTThread(threading.Thread):
=== ZODB3/ZEO/tests/CommitLockTests.py 1.12.18.1 => 1.12.18.2 ===
--- ZODB3/ZEO/tests/CommitLockTests.py:1.12.18.1 Mon Jul 21 12:37:15 2003
+++ ZODB3/ZEO/tests/CommitLockTests.py Mon Sep 15 14:02:59 2003
@@ -27,7 +27,7 @@
ZERO = '\0'*8
class DummyDB:
- def invalidate(self, *args):
+ def invalidate(self, *args, **kwargs):
pass
class WorkerThread(TestThread):
@@ -78,12 +78,11 @@
class CommitLockTests:
- NUM_CLIENTS = 5
+ NUM_CLIENTS = 20
# The commit lock tests verify that the storage successfully
# blocks and restarts transactions when there is contention for a
- # single storage. There are a lot of cases to cover. transaction
- # has finished.
+ # single storage. There are a lot of cases to cover.
# The general flow of these tests is to start a transaction by
# getting far enough into 2PC to acquire the commit lock. Then
@@ -205,7 +204,8 @@
def _begin_threads(self):
# Start a second transaction on a different connection without
- # blocking the test thread.
+ # blocking the test thread. Returns only after each thread has
+ # set it's ready event.
self._storages = []
self._threads = []
@@ -242,5 +242,5 @@
def _get_timestamp(self):
t = time.time()
- t = TimeStamp(*time.gmtime(t)[:5]+(t%60,))
+ t = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
return `t`
More information about the Zope-Checkins
mailing list