[Zope-Checkins] CVS: ZODB3/ZEO/tests - CommitLockTests.py:1.11

Jeremy Hylton jeremy@zope.com
Tue, 7 Jan 2003 14:26:16 -0500


Update of /cvs-repository/ZODB3/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv21743/ZEO/tests

Modified Files:
	CommitLockTests.py 
Log Message:
Rewrite tests to make sure they actually test the commit lock.


=== ZODB3/ZEO/tests/CommitLockTests.py 1.10 => 1.11 ===
--- ZODB3/ZEO/tests/CommitLockTests.py:1.10	Mon Nov 18 18:17:40 2002
+++ ZODB3/ZEO/tests/CommitLockTests.py	Tue Jan  7 14:26:13 2003
@@ -51,8 +51,7 @@
             oid = self.storage.new_oid()
             p = zodb_pickle(MinPO("c"))
             self.storage.store(oid, ZERO, p, '', self.trans)
-            self.ready.set()
-            self.storage.tpc_vote(self.trans)
+            self.myvote()
             if self.method == "tpc_finish":
                 self.storage.tpc_finish(self.trans)
             else:
@@ -60,44 +59,37 @@
         except Disconnected:
             pass
 
-class CommitLockTests:
-
-    # The commit lock tests verify that the storage successfully
-    # blocks and restarts transactions when there is content for a
-    # single storage.  There are a lot of cases to cover.
-
-    # CommitLock1 checks the case where a single transaction delays
-    # other transactions before they actually block.  IOW, by the time
-    # the other transactions get to the vote stage, the first
-    # transaction has finished.
-
-    def checkCommitLock1OnCommit(self):
-        self._storages = []
-        try:
-            self._checkCommitLock("tpc_finish", self._dosetup1, self._dowork1)
-        finally:
-            self._cleanup()
+    def myvote(self):
+        # The vote() call is synchronous, which makes it difficult to
+        # coordinate the action of multiple threads that all call
+        # vote().  This method sends the vote call, then sets the
+        # event saying vote was called, then waits for the vote
+        # response.  It digs deep into the implementation of the client.
+
+        # This method is a replacement for:
+        #     self.ready.set()
+        #     self.storage.tpc_vote(self.trans)
+
+        rpc = self.storage._server.rpc
+        msgid = rpc._deferred_call('vote', self.storage._serial)
+        self.ready.set()
+        rpc._deferred_wait(msgid)
+        self.storage._check_serials()
 
-    def checkCommitLock1OnAbort(self):
-        self._storages = []
-        try:
-            self._checkCommitLock("tpc_abort", self._dosetup1, self._dowork1)
-        finally:
-            self._cleanup()
+class CommitLockTests:
 
-    def checkCommitLock2OnCommit(self):
-        self._storages = []
-        try:
-            self._checkCommitLock("tpc_finish", self._dosetup2, self._dowork2)
-        finally:
-            self._cleanup()
+    NUM_CLIENTS = 5
 
-    def checkCommitLock2OnAbort(self):
-        self._storages = []
-        try:
-            self._checkCommitLock("tpc_abort", self._dosetup2, self._dowork2)
-        finally:
-            self._cleanup()
+    # The commit lock tests verify that the storage successfully
+    # blocks and restarts transactions when there is contention for a
+    # single storage.  There are a lot of cases to cover.  transaction
+    # has finished.
+
+    # The general flow of these tests is to start a transaction by
+    # getting far enough into 2PC to acquire the commit lock.  Then
+    # begin one or more other connections that also want to commit.
+    # This causes the commit lock code to be exercised.  Once the
+    # other connections are started, the first transaction completes.
 
     def _cleanup(self):
         for store, trans in self._storages:
@@ -105,77 +97,135 @@
             store.close()
         self._storages = []
 
-    def _checkCommitLock(self, method_name, dosetup, dowork):
-        # check the commit lock when a client attemps a transaction,
-        # but fails/exits before finishing the commit.
-
-        # The general flow of these tests is to start a transaction by
-        # calling tpc_begin().  Then begin one or more other
-        # connections that also want to commit.  This causes the
-        # commit lock code to be exercised.  Once the other
-        # connections are started, the first transaction completes.
-        # Either by commit or abort, depending on whether method_name
-        # is "tpc_finish."
-
-        # The tests are parameterized by method_name, dosetup(), and
-        # dowork().  The dosetup() function is called with a
-        # connectioned client storage, transaction, and timestamp.
-        # Any work it does occurs after the first transaction has
-        # started, but before it finishes.  The dowork() function
-        # executes after the first transaction has completed.
-
-        # Start on transaction normally and get the lock.
-        t = Transaction()
-        self._storage.tpc_begin(t)
+    def _start_txn(self):
+        txn = Transaction()
+        self._storage.tpc_begin(txn)
         oid = self._storage.new_oid()
-        self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', t)
-        self._storage.tpc_vote(t)
+        self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', txn)
+        return oid, txn
 
-        # Start a second transaction on a different connection without
-        # blocking the test thread.
-        self._storages = []
-        for i in range(4):
-            storage2 = self._duplicate_client()
-            t2 = Transaction()
-            tid = self._get_timestamp()
-            dosetup(storage2, t2, tid)
-            if i == 0:
-                storage2.close()
-            else:
-                self._storages.append((storage2, t2))
+    def checkCommitLockVoteFinish(self):
+        oid, txn = self._start_txn()
+        self._storage.tpc_vote(txn)
+
+        self._begin_threads()
 
-        if method_name == "tpc_finish":
-            self._storage.tpc_finish(t)
-            self._storage.load(oid, '')
-        else:
-            self._storage.tpc_abort(t)
+        self._storage.tpc_finish(txn)
+        self._storage.load(oid, '')
 
-        dowork(method_name)
+        self._finish_threads()
 
-        # Make sure the server is still responsive
         self._dostore()
+        self._cleanup()
+        
+    def checkCommitLockVoteAbort(self):
+        oid, txn = self._start_txn()
+        self._storage.tpc_vote(txn)
 
-    def _dosetup1(self, storage, trans, tid):
-        storage.tpc_begin(trans, tid)
+        self._begin_threads()
 
-    def _dowork1(self, method_name):
-        for store, trans in self._storages:
-            oid = store.new_oid()
-            store.store(oid, ZERO, zodb_pickle(MinPO("c")), '', trans)
-            store.tpc_vote(trans)
-            if method_name == "tpc_finish":
-                store.tpc_finish(trans)
-            else:
-                store.tpc_abort(trans)
+        self._storage.tpc_abort(txn)
 
-    def _dosetup2(self, storage, trans, tid):
+        self._finish_threads()
+
+        self._dostore()
+        self._cleanup()
+        
+    def checkCommitLockVoteClose(self):
+        oid, txn = self._start_txn()
+        self._storage.tpc_vote(txn)
+
+        self._begin_threads()
+
+        self._storage.close()
+
+        self._finish_threads()
+        self._cleanup()
+
+    def _get_trans_id(self):
+        self._dostore()
+        L = self._storage.undoInfo()
+        return L[0]['id']
+
+    def _begin_undo(self, trans_id):
+        rpc = self._storage._server.rpc
+        return rpc._deferred_call('transactionalUndo', trans_id,
+                                  self._storage._serial)
+
+    def _finish_undo(self, msgid):
+        return self._storage._server.rpc._deferred_wait(msgid)
+        
+    def checkCommitLockUndoFinish(self):
+        trans_id = self._get_trans_id()
+        oid, txn = self._start_txn()
+        msgid = self._begin_undo(trans_id)
+
+        self._begin_threads()
+
+        self._finish_undo(msgid)
+        self._storage.tpc_vote(txn)
+        self._storage.tpc_finish(txn)
+        self._storage.load(oid, '')
+
+        self._finish_threads()
+
+        self._dostore()
+        self._cleanup()
+        
+    def checkCommitLockUndoAbort(self):
+        trans_id = self._get_trans_id()
+        oid, txn = self._start_txn()
+        msgid = self._begin_undo(trans_id)
+
+        self._begin_threads()
+
+        self._finish_undo(msgid)
+        self._storage.tpc_vote(txn)
+        self._storage.tpc_abort(txn)
+
+        self._finish_threads()
+
+        self._dostore()
+        self._cleanup()
+        
+    def checkCommitLockUndoClose(self):
+        trans_id = self._get_trans_id()
+        oid, txn = self._start_txn()
+        msgid = self._begin_undo(trans_id)
+
+        self._begin_threads()
+
+        self._finish_undo(msgid)
+        self._storage.tpc_vote(txn)
+        self._storage.close()
+
+        self._finish_threads()
+
+        self._cleanup()
+        
+    def _begin_threads(self):
+        # Start a second transaction on a different connection without
+        # blocking the test thread.
+        self._storages = []
         self._threads = []
-        t = WorkerThread(self, storage, trans)
-        self._threads.append(t)
-        t.start()
-        t.ready.wait()
+        
+        for i in range(self.NUM_CLIENTS):
+            storage = self._duplicate_client()
+            txn = Transaction()
+            tid = self._get_timestamp()
+            
+            t = WorkerThread(self, storage, txn)
+            self._threads.append(t)
+            t.start()
+            t.ready.wait()
+        
+            # Close on the connections abnormally to test server response
+            if i == 0:
+                storage.close()
+            else:
+                self._storages.append((storage, txn))
 
-    def _dowork2(self, method_name):
+    def _finish_threads(self):
         for t in self._threads:
             t.cleanup()