[Zope-Checkins] CVS: ZODB3/ZEO/tests - InvalidationTests.py:1.1.4.1 ConnectionTests.py:1.4.2.5
Jeremy Hylton
jeremy@zope.com
Wed, 11 Jun 2003 19:12:02 -0400
Update of /cvs-repository/ZODB3/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv6722/ZEO/tests
Modified Files:
Tag: ZODB3-3_1-branch
ConnectionTests.py
Added Files:
Tag: ZODB3-3_1-branch
InvalidationTests.py
Log Message:
Merge tim-loading_oids_status-branch to ZODB 3.1 release branch.
Includes four critical bug fixes.
=== Added File ZODB3/ZEO/tests/InvalidationTests.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import threading
import time
from BTrees.check import check, display
from BTrees.OOBTree import OOBTree
from ZEO.tests.TestThread import TestThread
from ZODB.DB import DB
from ZODB.POSException \
import ReadConflictError, ConflictError, VersionLockError
class StressThread(TestThread):
def __init__(self, testcase, db, stop, threadnum, startnum,
step=2, sleep=None):
TestThread.__init__(self, testcase)
self.db = db
self.stop = stop
self.threadnum = threadnum
self.startnum = startnum
self.step = step
self.sleep = sleep
self.added_keys = []
def testrun(self):
cn = self.db.open()
while not self.stop.isSet():
try:
tree = cn.root()["tree"]
break
except (ConflictError, KeyError):
get_transaction().abort()
cn.sync()
key = self.startnum
while not self.stop.isSet():
try:
tree[key] = self.threadnum
get_transaction().commit()
if self.sleep:
time.sleep(self.sleep)
except (ReadConflictError, ConflictError), msg:
get_transaction().abort()
# sync() is necessary here to process invalidations
# if we get a read conflict. In the read conflict case,
# no objects were modified so cn never got registered
# with the transaction.
cn.sync()
else:
self.added_keys.append(key)
key += self.step
cn.close()
class VersionStressThread(TestThread):
def __init__(self, testcase, db, stop, threadnum, startnum,
step=2, sleep=None):
TestThread.__init__(self, testcase)
self.db = db
self.stop = stop
self.threadnum = threadnum
self.startnum = startnum
self.step = step
self.sleep = sleep
self.added_keys = []
def testrun(self):
commit = 0
key = self.startnum
while not self.stop.isSet():
version = "%s:%s" % (self.threadnum, key)
commit = not commit
if self.oneupdate(version, key, commit):
self.added_keys.append(key)
key += self.step
def oneupdate(self, version, key, commit=1):
# The mess of sleeps below were added to reduce the number
# of VersionLockErrors, based on empirical observation.
# It looks like the threads don't switch enough without
# the sleeps.
cn = self.db.open(version)
while not self.stop.isSet():
try:
tree = cn.root()["tree"]
break
except (ConflictError, KeyError):
get_transaction().abort()
cn.sync()
while not self.stop.isSet():
try:
tree[key] = self.threadnum
get_transaction().commit()
if self.sleep:
time.sleep(self.sleep)
break
except (VersionLockError, ReadConflictError, ConflictError), msg:
get_transaction().abort()
# sync() is necessary here to process invalidations
# if we get a read conflict. In the read conflict case,
# no objects were modified so cn never got registered
# with the transaction.
cn.sync()
if self.sleep:
time.sleep(self.sleep)
try:
while not self.stop.isSet():
try:
if commit:
self.db.commitVersion(version)
get_transaction().note("commit version %s" % version)
else:
self.db.abortVersion(version)
get_transaction().note("abort version %s" % version)
get_transaction().commit()
if self.sleep:
time.sleep(self.sleep)
return commit
except ConflictError, msg:
get_transaction().abort()
cn.sync()
finally:
cn.close()
return 0
class InvalidationTests:
level = 2
DELAY = 15
def _check_tree(self, cn, tree):
# Make sure the BTree is sane and that all the updates persisted
retries = 3
while retries:
retries -= 1
try:
check(tree)
tree._check()
except ReadConflictError:
if retries:
get_transaction().abort()
cn.sync()
else:
raise
except:
display(tree)
raise
def _check_threads(self, tree, *threads):
# Make sure the thread's view of the world is consistent with
# the actual database state.
for t in threads:
# If the test didn't add any keys, it didn't do what we expected.
self.assert_(t.added_keys)
for key in t.added_keys:
self.assert_(tree.has_key(key))
def go(self, stop, *threads):
# Run the threads
for t in threads:
t.start()
time.sleep(self.DELAY)
stop.set()
for t in threads:
t.cleanup()
def checkConcurrentUpdates2Storages(self):
self._storage = storage1 = self.openClientStorage()
storage2 = self.openClientStorage()
db1 = DB(storage1)
db2 = DB(storage2)
stop = threading.Event()
cn = db1.open()
tree = cn.root()["tree"] = OOBTree()
get_transaction().commit()
# Run two threads that update the BTree
t1 = StressThread(self, db1, stop, 1, 1)
t2 = StressThread(self, db2, stop, 2, 2)
self.go(stop, t1, t2)
cn.sync()
self._check_tree(cn, tree)
self._check_threads(tree, t1, t2)
cn.close()
db1.close()
db2.close()
def checkConcurrentUpdates1Storage(self):
self._storage = storage1 = self.openClientStorage()
db1 = DB(storage1)
stop = threading.Event()
cn = db1.open()
tree = cn.root()["tree"] = OOBTree()
get_transaction().commit()
# Run two threads that update the BTree
t1 = StressThread(self, db1, stop, 1, 1, sleep=0.001)
t2 = StressThread(self, db1, stop, 2, 2, sleep=0.001)
self.go(stop, t1, t2)
cn.sync()
self._check_tree(cn, tree)
self._check_threads(tree, t1, t2)
cn.close()
db1.close()
def checkConcurrentUpdates2StoragesMT(self):
self._storage = storage1 = self.openClientStorage()
db1 = DB(storage1)
db2 = DB(self.openClientStorage())
stop = threading.Event()
cn = db1.open()
tree = cn.root()["tree"] = OOBTree()
get_transaction().commit()
# Run three threads that update the BTree.
# Two of the threads share a single storage so that it
# is possible for both threads to read the same object
# at the same time.
t1 = StressThread(self, db1, stop, 1, 1, 3)
t2 = StressThread(self, db2, stop, 2, 2, 3, 0.001)
t3 = StressThread(self, db2, stop, 3, 3, 3, 0.001)
self.go(stop, t1, t2, t3)
cn.sync()
self._check_tree(cn, tree)
self._check_threads(tree, t1, t2, t3)
cn.close()
db1.close()
db2.close()
def checkConcurrentUpdatesInVersions(self):
self._storage = storage1 = self.openClientStorage()
db1 = DB(storage1)
db2 = DB(self.openClientStorage())
stop = threading.Event()
cn = db1.open()
tree = cn.root()["tree"] = OOBTree()
get_transaction().commit()
# Run three threads that update the BTree.
# Two of the threads share a single storage so that it
# is possible for both threads to read the same object
# at the same time.
t1 = VersionStressThread(self, db1, stop, 1, 1, 3)
t2 = VersionStressThread(self, db2, stop, 2, 2, 3, 0.001)
t3 = VersionStressThread(self, db2, stop, 3, 3, 3, 0.001)
self.go(stop, t1, t2, t3)
cn.sync()
self._check_tree(cn, tree)
self._check_threads(tree, t1, t2, t3)
cn.close()
db1.close()
db2.close()
=== ZODB3/ZEO/tests/ConnectionTests.py 1.4.2.4 => 1.4.2.5 ===
--- ZODB3/ZEO/tests/ConnectionTests.py:1.4.2.4 Tue Apr 29 17:39:56 2003
+++ ZODB3/ZEO/tests/ConnectionTests.py Wed Jun 11 19:11:32 2003
@@ -26,6 +26,7 @@
from ZEO.ClientStorage import ClientStorage, ClientDisconnected
from ZEO.Exceptions import Disconnected
from ZEO.zrpc.marshal import Marshaller
+from ZEO.tests.InvalidationTests import InvalidationTests
from ZODB.Transaction import get_transaction, Transaction
from ZODB.POSException import ReadOnlyError
@@ -37,7 +38,8 @@
def invalidate(self, *args):
pass
-class ConnectionTests(StorageTestBase.StorageTestBase):
+class ConnectionTests(StorageTestBase.StorageTestBase,
+ InvalidationTests):
"""Tests that explicitly manage the server process.
To test the cache or re-connection, these test cases explicit
@@ -53,7 +55,7 @@
getStorage() method.
"""
zLOG.LOG("testZEO", zLOG.INFO, "setUp() %s" % self.id())
- self.file = tempfile.mktemp()
+ self.file = tempfile.mktemp(".fs")
self.addr = []
self._pids = []
self._servers = []
@@ -574,3 +576,4 @@
txn = Transaction()
storage.tpc_begin(txn)
storage.tpc_abort(txn)
+