[Zodb-checkins] CVS: StandaloneZODB/ZEO/tests - CommitLockTests.py:1.1.2.3
Jeremy Hylton
jeremy@zope.com
Tue, 7 May 2002 00:43:15 -0400
Update of /cvs-repository/StandaloneZODB/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv17792/tests
Modified Files:
Tag: ZEO2-branch
CommitLockTests.py
Log Message:
Add a multi-threaded test of the commit lock that actually queues
clients.
=== StandaloneZODB/ZEO/tests/CommitLockTests.py 1.1.2.2 => 1.1.2.3 ===
+import threading
+
from ZODB.Transaction import Transaction
from ZODB.tests.StorageTestBase import zodb_pickle, MinPO
import ZEO.ClientStorage
+from ZEO.Exceptions import Disconnected
ZERO = '\0'*8
@@ -11,19 +14,68 @@
def invalidate(self, *args):
pass
+class WorkerThread(threading.Thread):
+
+ # run the entire test in a thread so that the blocking call for
+ # tpc_vote() doesn't hang the test suite.
+
+ def __init__(self, storage, trans, method="tpc_finish"):
+ self.storage = storage
+ self.trans = trans
+ self.method = method
+ threading.Thread.__init__(self)
+
+ def run(self):
+ try:
+ self.storage.tpc_begin(self.trans)
+ except Disconnected:
+ return
+ oid = self.storage.new_oid()
+ self.storage.store(oid, ZERO, zodb_pickle(MinPO("c")), '', self.trans)
+ oid = self.storage.new_oid()
+ self.storage.store(oid, ZERO, zodb_pickle(MinPO("c")), '', self.trans)
+ self.storage.tpc_vote(self.trans)
+ if self.method == "tpc_finish":
+ self.storage.tpc_finish(self.trans)
+ else:
+ self.storage.tpc_abort(self.trans)
+
class CommitLockTests:
+
+ # The commit lock tests verify that the storage successfully
+ # blocks and restarts transactions when there is content for a
+ # single storage. There are a lot of cases to cover.
+
+ # CommitLock1 checks the case where a single transaction delays
+ # other transactions before they actually block. IOW, by the time
+ # the other transactions get to the vote stage, the first
+ # transaction has finished.
- def checkCommitLockOnCommit(self):
+ def checkCommitLock1OnCommit(self):
+ self._storages = []
+ try:
+ self._checkCommitLock("tpc_finish", self._dosetup1, self._dowork1)
+ finally:
+ self._cleanup()
+
+ def checkCommitLock1OnAbort(self):
self._storages = []
try:
- self._checkCommitLock("tpc_finish")
+ self._checkCommitLock("tpc_abort", self._dosetup1, self._dowork1)
finally:
self._cleanup()
- def checkCommitLockOnAbort(self):
+ def checkCommitLock2OnCommit(self):
self._storages = []
try:
- self._checkCommitLock("tpc_abort")
+ self._checkCommitLock("tpc_finish", self._dosetup2, self._dowork2)
+ finally:
+ self._cleanup()
+
+ def checkCommitLock2OnAbort(self):
+ self._storages = []
+ try:
+ self._checkCommitLock("tpc_abort", self._dosetup2, self._dowork2)
finally:
self._cleanup()
@@ -33,7 +85,7 @@
store.close()
self._storages = []
- def _checkCommitLock(self, method_name):
+ def _checkCommitLock(self, method_name, dosetup, dowork):
# check the commit lock when a client attemps a transaction,
# but fails/exits before finishing the commit.
@@ -44,12 +96,11 @@
# Start a second transaction on a different connection without
# blocking the test thread.
self._storages = []
- for i in range(3):
+ for i in range(4):
storage2 = self._duplicate_client()
t2 = Transaction()
- # ???
- tid = `ZEO.ClientStorage.get_timestamp()`
- storage2.tpc_begin(t2, tid)
+ tid = `ZEO.ClientStorage.get_timestamp()` # XXX why?
+ dosetup(storage2, t2, tid)
if i == 0:
storage2.close()
else:
@@ -64,17 +115,33 @@
else:
self._storage.tpc_abort(t)
- self._dowork()
+ dowork(method_name)
# Make sure the server is still responsive
self._dostore()
- def _dowork(self):
+ def _dosetup1(self, storage, trans, tid):
+ storage.tpc_begin(trans, tid)
+
+ def _dowork1(self, method_name):
for store, trans in self._storages:
oid = store.new_oid()
store.store(oid, ZERO, zodb_pickle(MinPO("c")), '', trans)
store.tpc_vote(trans)
- store.tpc_abort(trans)
+ if method_name == "tpc_finish":
+ store.tpc_finish(trans)
+ else:
+ store.tpc_abort(trans)
+
+ def _dosetup2(self, storage, trans, tid):
+ self._threads = []
+ t = WorkerThread(storage, trans)
+ self._threads.append(t)
+ t.start()
+
+ def _dowork2(self, method_name):
+ for t in self._threads:
+ t.join()
def _duplicate_client(self):
"Open another ClientStorage to the same server."