[Zope-Checkins] CVS: ZODB3/ZEO/tests - CommitLockTests.py:1.11
Jeremy Hylton
jeremy@zope.com
Tue, 7 Jan 2003 14:26:16 -0500
Update of /cvs-repository/ZODB3/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv21743/ZEO/tests
Modified Files:
CommitLockTests.py
Log Message:
Rewrite tests to make sure they actually test the commit lock.
=== ZODB3/ZEO/tests/CommitLockTests.py 1.10 => 1.11 ===
--- ZODB3/ZEO/tests/CommitLockTests.py:1.10 Mon Nov 18 18:17:40 2002
+++ ZODB3/ZEO/tests/CommitLockTests.py Tue Jan 7 14:26:13 2003
@@ -51,8 +51,7 @@
oid = self.storage.new_oid()
p = zodb_pickle(MinPO("c"))
self.storage.store(oid, ZERO, p, '', self.trans)
- self.ready.set()
- self.storage.tpc_vote(self.trans)
+ self.myvote()
if self.method == "tpc_finish":
self.storage.tpc_finish(self.trans)
else:
@@ -60,44 +59,37 @@
except Disconnected:
pass
-class CommitLockTests:
-
- # The commit lock tests verify that the storage successfully
- # blocks and restarts transactions when there is content for a
- # single storage. There are a lot of cases to cover.
-
- # CommitLock1 checks the case where a single transaction delays
- # other transactions before they actually block. IOW, by the time
- # the other transactions get to the vote stage, the first
- # transaction has finished.
-
- def checkCommitLock1OnCommit(self):
- self._storages = []
- try:
- self._checkCommitLock("tpc_finish", self._dosetup1, self._dowork1)
- finally:
- self._cleanup()
+ def myvote(self):
+ # The vote() call is synchronous, which makes it difficult to
+ # coordinate the action of multiple threads that all call
+ # vote(). This method sends the vote call, then sets the
+ # event saying vote was called, then waits for the vote
+ # response. It digs deep into the implementation of the client.
+
+ # This method is a replacement for:
+ # self.ready.set()
+ # self.storage.tpc_vote(self.trans)
+
+ rpc = self.storage._server.rpc
+ msgid = rpc._deferred_call('vote', self.storage._serial)
+ self.ready.set()
+ rpc._deferred_wait(msgid)
+ self.storage._check_serials()
- def checkCommitLock1OnAbort(self):
- self._storages = []
- try:
- self._checkCommitLock("tpc_abort", self._dosetup1, self._dowork1)
- finally:
- self._cleanup()
+class CommitLockTests:
- def checkCommitLock2OnCommit(self):
- self._storages = []
- try:
- self._checkCommitLock("tpc_finish", self._dosetup2, self._dowork2)
- finally:
- self._cleanup()
+ NUM_CLIENTS = 5
- def checkCommitLock2OnAbort(self):
- self._storages = []
- try:
- self._checkCommitLock("tpc_abort", self._dosetup2, self._dowork2)
- finally:
- self._cleanup()
+ # The commit lock tests verify that the storage successfully
+ # blocks and restarts transactions when there is contention for a
+ # single storage. There are a lot of cases to cover. transaction
+ # has finished.
+
+ # The general flow of these tests is to start a transaction by
+ # getting far enough into 2PC to acquire the commit lock. Then
+ # begin one or more other connections that also want to commit.
+ # This causes the commit lock code to be exercised. Once the
+ # other connections are started, the first transaction completes.
def _cleanup(self):
for store, trans in self._storages:
@@ -105,77 +97,135 @@
store.close()
self._storages = []
- def _checkCommitLock(self, method_name, dosetup, dowork):
- # check the commit lock when a client attemps a transaction,
- # but fails/exits before finishing the commit.
-
- # The general flow of these tests is to start a transaction by
- # calling tpc_begin(). Then begin one or more other
- # connections that also want to commit. This causes the
- # commit lock code to be exercised. Once the other
- # connections are started, the first transaction completes.
- # Either by commit or abort, depending on whether method_name
- # is "tpc_finish."
-
- # The tests are parameterized by method_name, dosetup(), and
- # dowork(). The dosetup() function is called with a
- # connectioned client storage, transaction, and timestamp.
- # Any work it does occurs after the first transaction has
- # started, but before it finishes. The dowork() function
- # executes after the first transaction has completed.
-
- # Start on transaction normally and get the lock.
- t = Transaction()
- self._storage.tpc_begin(t)
+ def _start_txn(self):
+ txn = Transaction()
+ self._storage.tpc_begin(txn)
oid = self._storage.new_oid()
- self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', t)
- self._storage.tpc_vote(t)
+ self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', txn)
+ return oid, txn
- # Start a second transaction on a different connection without
- # blocking the test thread.
- self._storages = []
- for i in range(4):
- storage2 = self._duplicate_client()
- t2 = Transaction()
- tid = self._get_timestamp()
- dosetup(storage2, t2, tid)
- if i == 0:
- storage2.close()
- else:
- self._storages.append((storage2, t2))
+ def checkCommitLockVoteFinish(self):
+ oid, txn = self._start_txn()
+ self._storage.tpc_vote(txn)
+
+ self._begin_threads()
- if method_name == "tpc_finish":
- self._storage.tpc_finish(t)
- self._storage.load(oid, '')
- else:
- self._storage.tpc_abort(t)
+ self._storage.tpc_finish(txn)
+ self._storage.load(oid, '')
- dowork(method_name)
+ self._finish_threads()
- # Make sure the server is still responsive
self._dostore()
+ self._cleanup()
+
+ def checkCommitLockVoteAbort(self):
+ oid, txn = self._start_txn()
+ self._storage.tpc_vote(txn)
- def _dosetup1(self, storage, trans, tid):
- storage.tpc_begin(trans, tid)
+ self._begin_threads()
- def _dowork1(self, method_name):
- for store, trans in self._storages:
- oid = store.new_oid()
- store.store(oid, ZERO, zodb_pickle(MinPO("c")), '', trans)
- store.tpc_vote(trans)
- if method_name == "tpc_finish":
- store.tpc_finish(trans)
- else:
- store.tpc_abort(trans)
+ self._storage.tpc_abort(txn)
- def _dosetup2(self, storage, trans, tid):
+ self._finish_threads()
+
+ self._dostore()
+ self._cleanup()
+
+ def checkCommitLockVoteClose(self):
+ oid, txn = self._start_txn()
+ self._storage.tpc_vote(txn)
+
+ self._begin_threads()
+
+ self._storage.close()
+
+ self._finish_threads()
+ self._cleanup()
+
+ def _get_trans_id(self):
+ self._dostore()
+ L = self._storage.undoInfo()
+ return L[0]['id']
+
+ def _begin_undo(self, trans_id):
+ rpc = self._storage._server.rpc
+ return rpc._deferred_call('transactionalUndo', trans_id,
+ self._storage._serial)
+
+ def _finish_undo(self, msgid):
+ return self._storage._server.rpc._deferred_wait(msgid)
+
+ def checkCommitLockUndoFinish(self):
+ trans_id = self._get_trans_id()
+ oid, txn = self._start_txn()
+ msgid = self._begin_undo(trans_id)
+
+ self._begin_threads()
+
+ self._finish_undo(msgid)
+ self._storage.tpc_vote(txn)
+ self._storage.tpc_finish(txn)
+ self._storage.load(oid, '')
+
+ self._finish_threads()
+
+ self._dostore()
+ self._cleanup()
+
+ def checkCommitLockUndoAbort(self):
+ trans_id = self._get_trans_id()
+ oid, txn = self._start_txn()
+ msgid = self._begin_undo(trans_id)
+
+ self._begin_threads()
+
+ self._finish_undo(msgid)
+ self._storage.tpc_vote(txn)
+ self._storage.tpc_abort(txn)
+
+ self._finish_threads()
+
+ self._dostore()
+ self._cleanup()
+
+ def checkCommitLockUndoClose(self):
+ trans_id = self._get_trans_id()
+ oid, txn = self._start_txn()
+ msgid = self._begin_undo(trans_id)
+
+ self._begin_threads()
+
+ self._finish_undo(msgid)
+ self._storage.tpc_vote(txn)
+ self._storage.close()
+
+ self._finish_threads()
+
+ self._cleanup()
+
+ def _begin_threads(self):
+ # Start a second transaction on a different connection without
+ # blocking the test thread.
+ self._storages = []
self._threads = []
- t = WorkerThread(self, storage, trans)
- self._threads.append(t)
- t.start()
- t.ready.wait()
+
+ for i in range(self.NUM_CLIENTS):
+ storage = self._duplicate_client()
+ txn = Transaction()
+ tid = self._get_timestamp()
+
+ t = WorkerThread(self, storage, txn)
+ self._threads.append(t)
+ t.start()
+ t.ready.wait()
+
+ # Close on the connections abnormally to test server response
+ if i == 0:
+ storage.close()
+ else:
+ self._storages.append((storage, txn))
- def _dowork2(self, method_name):
+ def _finish_threads(self):
for t in self._threads:
t.cleanup()