[Zope3-checkins] CVS: Zope3/src/zodb/zeo/tests - thread.py:1.1.2.1 commitlock.py:1.3.2.2

Jeremy Hylton jeremy@zope.com
Thu, 13 Feb 2003 13:23:25 -0500


Update of /cvs-repository/Zope3/src/zodb/zeo/tests
In directory cvs.zope.org:/tmp/cvs-serv9937/tests

Modified Files:
      Tag: ZODB3-2-integration-branch
	commitlock.py 
Added Files:
      Tag: ZODB3-2-integration-branch
	thread.py 
Log Message:
Update commit lock tests from ZODB3.2.


=== Added File Zope3/src/zodb/zeo/tests/thread.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""A Thread base class for use with unittest."""

from cStringIO import StringIO
import threading
import traceback

class TestThread(threading.Thread):
    __super_init = threading.Thread.__init__
    __super_run = threading.Thread.run

    def __init__(self, testcase, group=None, target=None, name=None,
                 args=(), kwargs={}, verbose=None):
        self.__super_init(group, target, name, args, kwargs, verbose)
        self.setDaemon(1)
        self._testcase = testcase

    def run(self):
        try:
            self.testrun()
        except Exception:
            s = StringIO()
            traceback.print_exc(file=s)
            self._testcase.fail("Exception in thread %s:\n%s\n" %
                                (self, s.getvalue()))

    def cleanup(self, timeout=15):
        self.join(timeout)
        if self.isAlive():
            self._testcase.fail("Thread did not finish: %s" % self)


=== Zope3/src/zodb/zeo/tests/commitlock.py 1.3.2.1 => 1.3.2.2 ===
--- Zope3/src/zodb/zeo/tests/commitlock.py:1.3.2.1	Mon Feb 10 17:08:29 2003
+++ Zope3/src/zodb/zeo/tests/commitlock.py	Thu Feb 13 13:23:25 2003
@@ -22,6 +22,7 @@
 
 from zodb.zeo.client import ClientStorage
 from zodb.zeo.interfaces import ClientDisconnected
+from zodb.zeo.tests.thread import TestThread
 
 ZERO = '\0'*8
 
@@ -29,36 +30,12 @@
     def invalidate(self, *args):
         pass
 
-class TestThread(threading.Thread):
-    __super_init = threading.Thread.__init__
-    __super_run = threading.Thread.run
-
-    def __init__(self, testcase, group=None, target=None, name=None,
-                 args=(), kwargs={}, verbose=None):
-        self.__super_init(group, target, name, args, kwargs, verbose)
-        self.setDaemon(1)
-        self._testcase = testcase
-
-    def run(self):
-        try:
-            self.testrun()
-        except Exception:
-            s = StringIO()
-            traceback.print_exc(file=s)
-            self._testcase.fail("Exception in thread %s:\n%s\n" %
-                                (self, s.getvalue()))
-
-    def cleanup(self, timeout=15):
-        self.join(timeout)
-        if self.isAlive():
-            self._testcase.fail("Thread did not finish: %s" % self)
-
 class WorkerThread(TestThread):
 
     # run the entire test in a thread so that the blocking call for
     # tpc_vote() doesn't hang the test suite.
 
-    def __init__(self, testcase, storage, trans, method="tpc_finish"):
+    def __init__(self, testcase, storage, trans, method="tpcFinish"):
         self.storage = storage
         self.trans = trans
         self.method = method
@@ -74,53 +51,45 @@
             oid = self.storage.newObjectId()
             p = zodb_pickle(MinPO("c"))
             self.storage.store(oid, ZERO, p, '', self.trans)
-            self.ready.set()
-            self.storage.tpcVote(self.trans)
-            if self.method == "tpc_finish":
+            self.myvote()
+            if self.method == "tpcFinish":
                 self.storage.tpcFinish(self.trans)
             else:
                 self.storage.tpcAbort(self.trans)
         except ClientDisconnected:
             pass
 
-class CommitLockTests:
-
-    # The commit lock tests verify that the storage successfully
-    # blocks and restarts transactions when there is content for a
-    # single storage.  There are a lot of cases to cover.
-
-    # CommitLock1 checks the case where a single transaction delays
-    # other transactions before they actually block.  IOW, by the time
-    # the other transactions get to the vote stage, the first
-    # transaction has finished.
-
-    def checkCommitLock1OnCommit(self):
-        self._storages = []
-        try:
-            self._checkCommitLock("tpc_finish", self._dosetup1, self._dowork1)
-        finally:
-            self._cleanup()
+    def myvote(self):
+        # The vote() call is synchronous, which makes it difficult to
+        # coordinate the action of multiple threads that all call
+        # vote().  This method sends the vote call, then sets the
+        # event saying vote was called, then waits for the vote
+        # response.  It digs deep into the implementation of the client.
+
+        # This method is a replacement for:
+        #     self.ready.set()
+        #     self.storage.tpc_vote(self.trans)
+
+        rpc = self.storage._server.rpc
+        msgid = rpc._deferred_call("tpcVote", self.storage._serial)
+        self.ready.set()
+        rpc._deferred_wait(msgid)
+        self.storage._check_serials()
 
-    def checkCommitLock1OnAbort(self):
-        self._storages = []
-        try:
-            self._checkCommitLock("tpc_abort", self._dosetup1, self._dowork1)
-        finally:
-            self._cleanup()
+class CommitLockTests:
 
-    def checkCommitLock2OnCommit(self):
-        self._storages = []
-        try:
-            self._checkCommitLock("tpc_finish", self._dosetup2, self._dowork2)
-        finally:
-            self._cleanup()
+    NUM_CLIENTS = 5
 
-    def checkCommitLock2OnAbort(self):
-        self._storages = []
-        try:
-            self._checkCommitLock("tpc_abort", self._dosetup2, self._dowork2)
-        finally:
-            self._cleanup()
+    # The commit lock tests verify that the storage successfully
+    # blocks and restarts transactions when there is contention for a
+    # single storage.  There are a lot of cases to cover.  transaction
+    # has finished.
+
+    # The general flow of these tests is to start a transaction by
+    # getting far enough into 2PC to acquire the commit lock.  Then
+    # begin one or more other connections that also want to commit.
+    # This causes the commit lock code to be exercised.  Once the
+    # other connections are started, the first transaction completes.
 
     def _cleanup(self):
         for store, trans in self._storages:
@@ -128,77 +97,134 @@
             store.close()
         self._storages = []
 
-    def _checkCommitLock(self, method_name, dosetup, dowork):
-        # check the commit lock when a client attemps a transaction,
-        # but fails/exits before finishing the commit.
-
-        # The general flow of these tests is to start a transaction by
-        # calling tpc_begin().  Then begin one or more other
-        # connections that also want to commit.  This causes the
-        # commit lock code to be exercised.  Once the other
-        # connections are started, the first transaction completes.
-        # Either by commit or abort, depending on whether method_name
-        # is "tpc_finish."
-
-        # The tests are parameterized by method_name, dosetup(), and
-        # dowork().  The dosetup() function is called with a
-        # connectioned client storage, transaction, and timestamp.
-        # Any work it does occurs after the first transaction has
-        # started, but before it finishes.  The dowork() function
-        # executes after the first transaction has completed.
-
-        # Start on transaction normally and get the lock.
-        t = Transaction()
-        self._storage.tpcBegin(t)
+    def _start_txn(self):
+        txn = Transaction()
+        self._storage.tpcBegin(txn)
         oid = self._storage.newObjectId()
-        self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', t)
-        self._storage.tpcVote(t)
+        self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', txn)
+        return oid, txn
 
-        # Start a second transaction on a different connection without
-        # blocking the test thread.
-        self._storages = []
-        for i in range(4):
-            storage2 = self._duplicate_client()
-            t2 = Transaction()
-            tid = self._get_timestamp()
-            dosetup(storage2, t2, tid)
-            if i == 0:
-                storage2.close()
-            else:
-                self._storages.append((storage2, t2))
+    def checkCommitLockVoteFinish(self):
+        oid, txn = self._start_txn()
+        self._storage.tpcVote(txn)
 
-        if method_name == "tpc_finish":
-            self._storage.tpcFinish(t)
-            self._storage.load(oid, '')
-        else:
-            self._storage.tpcAbort(t)
+        self._begin_threads()
 
-        dowork(method_name)
+        self._storage.tpcFinish(txn)
+        self._storage.load(oid, '')
+
+        self._finish_threads()
 
-        # Make sure the server is still responsive
         self._dostore()
+        self._cleanup()
+        
+    def checkCommitLockVoteAbort(self):
+        oid, txn = self._start_txn()
+        self._storage.tpcVote(txn)
 
-    def _dosetup1(self, storage, trans, tid):
-        storage.tpcBegin(trans, tid)
+        self._begin_threads()
 
-    def _dowork1(self, method_name):
-        for store, trans in self._storages:
-            oid = store.newObjectId()
-            store.store(oid, ZERO, zodb_pickle(MinPO("c")), '', trans)
-            store.tpcVote(trans)
-            if method_name == "tpc_finish":
-                store.tpcFinish(trans)
-            else:
-                store.tpcAbort(trans)
+        self._storage.tpcAbort(txn)
+
+        self._finish_threads()
+
+        self._dostore()
+        self._cleanup()
+        
+    def checkCommitLockVoteClose(self):
+        oid, txn = self._start_txn()
+        self._storage.tpcVote(txn)
+
+        self._begin_threads()
+
+        self._storage.close()
+
+        self._finish_threads()
+        self._cleanup()
+
+    def _get_trans_id(self):
+        self._dostore()
+        L = self._storage.undoInfo()
+        return L[0]['id']
+
+    def _begin_undo(self, trans_id):
+        rpc = self._storage._server.rpc
+        return rpc._deferred_call("undo", trans_id, self._storage._serial)
+
+    def _finish_undo(self, msgid):
+        return self._storage._server.rpc._deferred_wait(msgid)
+        
+    def checkCommitLockUndoFinish(self):
+        trans_id = self._get_trans_id()
+        oid, txn = self._start_txn()
+        msgid = self._begin_undo(trans_id)
+
+        self._begin_threads()
+
+        self._finish_undo(msgid)
+        self._storage.tpcVote(txn)
+        self._storage.tpcFinish(txn)
+        self._storage.load(oid, '')
 
-    def _dosetup2(self, storage, trans, tid):
+        self._finish_threads()
+
+        self._dostore()
+        self._cleanup()
+        
+    def checkCommitLockUndoAbort(self):
+        trans_id = self._get_trans_id()
+        oid, txn = self._start_txn()
+        msgid = self._begin_undo(trans_id)
+
+        self._begin_threads()
+
+        self._finish_undo(msgid)
+        self._storage.tpcVote(txn)
+        self._storage.tpcAbort(txn)
+
+        self._finish_threads()
+
+        self._dostore()
+        self._cleanup()
+        
+    def checkCommitLockUndoClose(self):
+        trans_id = self._get_trans_id()
+        oid, txn = self._start_txn()
+        msgid = self._begin_undo(trans_id)
+
+        self._begin_threads()
+
+        self._finish_undo(msgid)
+        self._storage.tpcVote(txn)
+        self._storage.close()
+
+        self._finish_threads()
+
+        self._cleanup()
+        
+    def _begin_threads(self):
+        # Start a second transaction on a different connection without
+        # blocking the test thread.
+        self._storages = []
         self._threads = []
-        t = WorkerThread(self, storage, trans)
-        self._threads.append(t)
-        t.start()
-        t.ready.wait()
+        
+        for i in range(self.NUM_CLIENTS):
+            storage = self._duplicate_client()
+            txn = Transaction()
+            tid = self._get_timestamp()
+            
+            t = WorkerThread(self, storage, txn)
+            self._threads.append(t)
+            t.start()
+            t.ready.wait()
+        
+            # Close on the connections abnormally to test server response
+            if i == 0:
+                storage.close()
+            else:
+                self._storages.append((storage, txn))
 
-    def _dowork2(self, method_name):
+    def _finish_threads(self):
         for t in self._threads:
             t.cleanup()
 
@@ -215,5 +241,5 @@
 
     def _get_timestamp(self):
         t = time.time()
-        ts = TimeStamp(*(time.gmtime(t)[:5] + (t % 60,)))
-        return ts.raw()
+        t = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
+        return `t`