[Zope3-checkins] CVS: Zope3/src/zodb/zeo/tests - thread.py:1.1.2.1 commitlock.py:1.3.2.2
Jeremy Hylton
jeremy@zope.com
Thu, 13 Feb 2003 13:23:25 -0500
Update of /cvs-repository/Zope3/src/zodb/zeo/tests
In directory cvs.zope.org:/tmp/cvs-serv9937/tests
Modified Files:
Tag: ZODB3-2-integration-branch
commitlock.py
Added Files:
Tag: ZODB3-2-integration-branch
thread.py
Log Message:
Update commit lock tests from ZODB3.2.
=== Added File Zope3/src/zodb/zeo/tests/thread.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""A Thread base class for use with unittest."""
from cStringIO import StringIO
import threading
import traceback
class TestThread(threading.Thread):
__super_init = threading.Thread.__init__
__super_run = threading.Thread.run
def __init__(self, testcase, group=None, target=None, name=None,
args=(), kwargs={}, verbose=None):
self.__super_init(group, target, name, args, kwargs, verbose)
self.setDaemon(1)
self._testcase = testcase
def run(self):
try:
self.testrun()
except Exception:
s = StringIO()
traceback.print_exc(file=s)
self._testcase.fail("Exception in thread %s:\n%s\n" %
(self, s.getvalue()))
def cleanup(self, timeout=15):
self.join(timeout)
if self.isAlive():
self._testcase.fail("Thread did not finish: %s" % self)
=== Zope3/src/zodb/zeo/tests/commitlock.py 1.3.2.1 => 1.3.2.2 ===
--- Zope3/src/zodb/zeo/tests/commitlock.py:1.3.2.1 Mon Feb 10 17:08:29 2003
+++ Zope3/src/zodb/zeo/tests/commitlock.py Thu Feb 13 13:23:25 2003
@@ -22,6 +22,7 @@
from zodb.zeo.client import ClientStorage
from zodb.zeo.interfaces import ClientDisconnected
+from zodb.zeo.tests.thread import TestThread
ZERO = '\0'*8
@@ -29,36 +30,12 @@
def invalidate(self, *args):
pass
-class TestThread(threading.Thread):
- __super_init = threading.Thread.__init__
- __super_run = threading.Thread.run
-
- def __init__(self, testcase, group=None, target=None, name=None,
- args=(), kwargs={}, verbose=None):
- self.__super_init(group, target, name, args, kwargs, verbose)
- self.setDaemon(1)
- self._testcase = testcase
-
- def run(self):
- try:
- self.testrun()
- except Exception:
- s = StringIO()
- traceback.print_exc(file=s)
- self._testcase.fail("Exception in thread %s:\n%s\n" %
- (self, s.getvalue()))
-
- def cleanup(self, timeout=15):
- self.join(timeout)
- if self.isAlive():
- self._testcase.fail("Thread did not finish: %s" % self)
-
class WorkerThread(TestThread):
# run the entire test in a thread so that the blocking call for
# tpc_vote() doesn't hang the test suite.
- def __init__(self, testcase, storage, trans, method="tpc_finish"):
+ def __init__(self, testcase, storage, trans, method="tpcFinish"):
self.storage = storage
self.trans = trans
self.method = method
@@ -74,53 +51,45 @@
oid = self.storage.newObjectId()
p = zodb_pickle(MinPO("c"))
self.storage.store(oid, ZERO, p, '', self.trans)
- self.ready.set()
- self.storage.tpcVote(self.trans)
- if self.method == "tpc_finish":
+ self.myvote()
+ if self.method == "tpcFinish":
self.storage.tpcFinish(self.trans)
else:
self.storage.tpcAbort(self.trans)
except ClientDisconnected:
pass
-class CommitLockTests:
-
- # The commit lock tests verify that the storage successfully
- # blocks and restarts transactions when there is content for a
- # single storage. There are a lot of cases to cover.
-
- # CommitLock1 checks the case where a single transaction delays
- # other transactions before they actually block. IOW, by the time
- # the other transactions get to the vote stage, the first
- # transaction has finished.
-
- def checkCommitLock1OnCommit(self):
- self._storages = []
- try:
- self._checkCommitLock("tpc_finish", self._dosetup1, self._dowork1)
- finally:
- self._cleanup()
+ def myvote(self):
+ # The vote() call is synchronous, which makes it difficult to
+ # coordinate the action of multiple threads that all call
+ # vote(). This method sends the vote call, then sets the
+ # event saying vote was called, then waits for the vote
+ # response. It digs deep into the implementation of the client.
+
+ # This method is a replacement for:
+ # self.ready.set()
+ # self.storage.tpc_vote(self.trans)
+
+ rpc = self.storage._server.rpc
+ msgid = rpc._deferred_call("tpcVote", self.storage._serial)
+ self.ready.set()
+ rpc._deferred_wait(msgid)
+ self.storage._check_serials()
- def checkCommitLock1OnAbort(self):
- self._storages = []
- try:
- self._checkCommitLock("tpc_abort", self._dosetup1, self._dowork1)
- finally:
- self._cleanup()
+class CommitLockTests:
- def checkCommitLock2OnCommit(self):
- self._storages = []
- try:
- self._checkCommitLock("tpc_finish", self._dosetup2, self._dowork2)
- finally:
- self._cleanup()
+ NUM_CLIENTS = 5
- def checkCommitLock2OnAbort(self):
- self._storages = []
- try:
- self._checkCommitLock("tpc_abort", self._dosetup2, self._dowork2)
- finally:
- self._cleanup()
+ # The commit lock tests verify that the storage successfully
+ # blocks and restarts transactions when there is contention for a
+ # single storage. There are a lot of cases to cover. transaction
+ # has finished.
+
+ # The general flow of these tests is to start a transaction by
+ # getting far enough into 2PC to acquire the commit lock. Then
+ # begin one or more other connections that also want to commit.
+ # This causes the commit lock code to be exercised. Once the
+ # other connections are started, the first transaction completes.
def _cleanup(self):
for store, trans in self._storages:
@@ -128,77 +97,134 @@
store.close()
self._storages = []
- def _checkCommitLock(self, method_name, dosetup, dowork):
- # check the commit lock when a client attemps a transaction,
- # but fails/exits before finishing the commit.
-
- # The general flow of these tests is to start a transaction by
- # calling tpc_begin(). Then begin one or more other
- # connections that also want to commit. This causes the
- # commit lock code to be exercised. Once the other
- # connections are started, the first transaction completes.
- # Either by commit or abort, depending on whether method_name
- # is "tpc_finish."
-
- # The tests are parameterized by method_name, dosetup(), and
- # dowork(). The dosetup() function is called with a
- # connectioned client storage, transaction, and timestamp.
- # Any work it does occurs after the first transaction has
- # started, but before it finishes. The dowork() function
- # executes after the first transaction has completed.
-
- # Start on transaction normally and get the lock.
- t = Transaction()
- self._storage.tpcBegin(t)
+ def _start_txn(self):
+ txn = Transaction()
+ self._storage.tpcBegin(txn)
oid = self._storage.newObjectId()
- self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', t)
- self._storage.tpcVote(t)
+ self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', txn)
+ return oid, txn
- # Start a second transaction on a different connection without
- # blocking the test thread.
- self._storages = []
- for i in range(4):
- storage2 = self._duplicate_client()
- t2 = Transaction()
- tid = self._get_timestamp()
- dosetup(storage2, t2, tid)
- if i == 0:
- storage2.close()
- else:
- self._storages.append((storage2, t2))
+ def checkCommitLockVoteFinish(self):
+ oid, txn = self._start_txn()
+ self._storage.tpcVote(txn)
- if method_name == "tpc_finish":
- self._storage.tpcFinish(t)
- self._storage.load(oid, '')
- else:
- self._storage.tpcAbort(t)
+ self._begin_threads()
- dowork(method_name)
+ self._storage.tpcFinish(txn)
+ self._storage.load(oid, '')
+
+ self._finish_threads()
- # Make sure the server is still responsive
self._dostore()
+ self._cleanup()
+
+ def checkCommitLockVoteAbort(self):
+ oid, txn = self._start_txn()
+ self._storage.tpcVote(txn)
- def _dosetup1(self, storage, trans, tid):
- storage.tpcBegin(trans, tid)
+ self._begin_threads()
- def _dowork1(self, method_name):
- for store, trans in self._storages:
- oid = store.newObjectId()
- store.store(oid, ZERO, zodb_pickle(MinPO("c")), '', trans)
- store.tpcVote(trans)
- if method_name == "tpc_finish":
- store.tpcFinish(trans)
- else:
- store.tpcAbort(trans)
+ self._storage.tpcAbort(txn)
+
+ self._finish_threads()
+
+ self._dostore()
+ self._cleanup()
+
+ def checkCommitLockVoteClose(self):
+ oid, txn = self._start_txn()
+ self._storage.tpcVote(txn)
+
+ self._begin_threads()
+
+ self._storage.close()
+
+ self._finish_threads()
+ self._cleanup()
+
+ def _get_trans_id(self):
+ self._dostore()
+ L = self._storage.undoInfo()
+ return L[0]['id']
+
+ def _begin_undo(self, trans_id):
+ rpc = self._storage._server.rpc
+ return rpc._deferred_call("undo", trans_id, self._storage._serial)
+
+ def _finish_undo(self, msgid):
+ return self._storage._server.rpc._deferred_wait(msgid)
+
+ def checkCommitLockUndoFinish(self):
+ trans_id = self._get_trans_id()
+ oid, txn = self._start_txn()
+ msgid = self._begin_undo(trans_id)
+
+ self._begin_threads()
+
+ self._finish_undo(msgid)
+ self._storage.tpcVote(txn)
+ self._storage.tpcFinish(txn)
+ self._storage.load(oid, '')
- def _dosetup2(self, storage, trans, tid):
+ self._finish_threads()
+
+ self._dostore()
+ self._cleanup()
+
+ def checkCommitLockUndoAbort(self):
+ trans_id = self._get_trans_id()
+ oid, txn = self._start_txn()
+ msgid = self._begin_undo(trans_id)
+
+ self._begin_threads()
+
+ self._finish_undo(msgid)
+ self._storage.tpcVote(txn)
+ self._storage.tpcAbort(txn)
+
+ self._finish_threads()
+
+ self._dostore()
+ self._cleanup()
+
+ def checkCommitLockUndoClose(self):
+ trans_id = self._get_trans_id()
+ oid, txn = self._start_txn()
+ msgid = self._begin_undo(trans_id)
+
+ self._begin_threads()
+
+ self._finish_undo(msgid)
+ self._storage.tpcVote(txn)
+ self._storage.close()
+
+ self._finish_threads()
+
+ self._cleanup()
+
+ def _begin_threads(self):
+ # Start a second transaction on a different connection without
+ # blocking the test thread.
+ self._storages = []
self._threads = []
- t = WorkerThread(self, storage, trans)
- self._threads.append(t)
- t.start()
- t.ready.wait()
+
+ for i in range(self.NUM_CLIENTS):
+ storage = self._duplicate_client()
+ txn = Transaction()
+ tid = self._get_timestamp()
+
+ t = WorkerThread(self, storage, txn)
+ self._threads.append(t)
+ t.start()
+ t.ready.wait()
+
+ # Close on the connections abnormally to test server response
+ if i == 0:
+ storage.close()
+ else:
+ self._storages.append((storage, txn))
- def _dowork2(self, method_name):
+ def _finish_threads(self):
for t in self._threads:
t.cleanup()
@@ -215,5 +241,5 @@
def _get_timestamp(self):
t = time.time()
- ts = TimeStamp(*(time.gmtime(t)[:5] + (t % 60,)))
- return ts.raw()
+ t = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
+ return `t`