[Zope-Checkins] CVS: ZODB3/ZEO - ClientStorage.py:1.73.2.28 ServerStub.py:1.9.2.1 StorageServer.py:1.74.2.11

Jeremy Hylton jeremy at zope.com
Fri Aug 22 17:45:59 EDT 2003


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv32661/ZEO

Modified Files:
      Tag: ZODB3-3_1-branch
	ClientStorage.py ServerStub.py StorageServer.py 
Log Message:
Apply several [hacks] from the ZODB3-vote-backoff-branch.

At least one of these is really an important fix.  The ZEO server was
holding onto ConflictErrors until after tpc_vote() was executed on the
storage.  This costs way too much for a large txn.  This change fixes
it to fail immediately.

Also, make two changes to tpc_vote().  

1. Add a mechanism to return a "VOTE_BACKOFF" message that instructs
   the client to try again later. This mechanism prevents the vote()
   call from blocking reads while the client waits.

2. If the actual txn being voted on is large, run it in a separate
   thread.


=== ZODB3/ZEO/ClientStorage.py 1.73.2.27 => 1.73.2.28 ===
--- ZODB3/ZEO/ClientStorage.py:1.73.2.27	Mon Aug  4 18:13:58 2003
+++ ZODB3/ZEO/ClientStorage.py	Fri Aug 22 16:45:28 2003
@@ -727,7 +727,19 @@
         """Storage API: vote on a transaction."""
         if transaction is not self._transaction:
             return
-        self._server.vote(self._serial)
+        backoff = 0.24
+        attempts = 0
+        while 1:
+            if self._server.vote_nb(self._serial) == "VOTE_BACKOFF":
+                time.sleep(backoff)
+                attempts += 1
+                backoff *= 2
+                backoff = min(backoff, 60)
+                if attempts > 70:
+                    raise StorageSystemError("Timed out waiting to vote")
+                continue
+            else:
+                break
         return self._check_serials()
 
     def tpc_begin(self, transaction, tid=None, status=' '):


=== ZODB3/ZEO/ServerStub.py 1.9 => 1.9.2.1 ===
--- ZODB3/ZEO/ServerStub.py:1.9	Tue Oct  1 14:49:12 2002
+++ ZODB3/ZEO/ServerStub.py	Fri Aug 22 16:45:28 2003
@@ -81,6 +81,9 @@
     def vote(self, trans_id):
         return self.rpc.call('vote', trans_id)
 
+    def vote_nb(self, trans_id):
+        return self.rpc.call('vote_nb', trans_id)
+
     def tpc_finish(self, id):
         return self.rpc.call('tpc_finish', id)
 


=== ZODB3/ZEO/StorageServer.py 1.74.2.10 => 1.74.2.11 ===
--- ZODB3/ZEO/StorageServer.py:1.74.2.10	Tue Apr 29 17:39:56 2003
+++ ZODB3/ZEO/StorageServer.py	Fri Aug 22 16:45:28 2003
@@ -496,6 +496,13 @@
         self.check_tid(id, exc=StorageTransactionError)
         return self.strategy.tpc_vote()
 
+    def vote_nb(self, id):
+        self.check_tid(id, exc=StorageTransactionError)
+        if isinstance(self.strategy, DelayedCommitStrategy):
+            return self.strategy.tpc_vote_nb()
+        else:
+            return self.strategy.tpc_vote()
+
     def abortVersion(self, src, id):
         self.check_tid(id, exc=StorageTransactionError)
         return self.strategy.abortVersion(src)
@@ -562,11 +569,10 @@
         self.timeout.begin(self)
         self.strategy = self.ImmediateCommitStrategyClass(
             self.storage, self.client, self.log)
-        resp = old_strategy.restart(self.strategy)
-        if delay is not None:
-            delay.reply(resp)
-        else:
-            return resp
+        resp = old_strategy.prepare(self.strategy, delay)
+        # If delay is None, then we were called from restart_other()
+        # and the return value doesn't matter.
+        return resp
 
 # A ZEOStorage instance can use different strategies to commit a
 # transaction.  The current implementation uses different strategies
@@ -622,6 +628,7 @@
         self.client = client
         self.invalidated = []
         self.serials = []
+        self.store_failed = 0
         self.log = logmethod
 
     def tpc_begin(self, txn, tid, status):
@@ -631,6 +638,12 @@
     def tpc_vote(self):
         # send all the serialnos as a batch
         self.client.serialnos(self.serials)
+        if self.store_failed:
+            # If a store failed, then the serialnos() call above will
+            # send an exception to the client which will cause it's
+            # vote() call to fail.  Don't call vote on the storage,
+            # because this txn will abort.
+            return
         return self.storage.tpc_vote(self.txn)
 
     def tpc_finish(self):
@@ -641,6 +654,8 @@
         self.storage.tpc_abort(self.txn)
 
     def store(self, oid, serial, data, version):
+        # Returns true if the store succeeded, false if it failed.
+        err = None
         try:
             newserial = self.storage.store(oid, serial, data, version,
                                            self.txn)
@@ -669,6 +684,7 @@
             if serial != "\0\0\0\0\0\0\0\0":
                 self.invalidated.append((oid, version))
         self.serials.append((oid, newserial))
+        return err is None
 
     def commitVersion(self, src, dest):
         oids = self.storage.commitVersion(src, dest, self.txn)
@@ -732,6 +748,12 @@
         self.args = ()
         return self.block()
 
+    def tpc_vote_nb(self):
+        if self.storage._transaction:
+            return "VOTE_BACKOFF"
+        else:
+            return self.tpc_vote()
+
     def commitVersion(self, src, dest):
         self.name = "commitVersion"
         self.args = src, dest
@@ -747,14 +769,36 @@
         self.args = trans_id,
         return self.block()
 
-    def restart(self, new_strategy):
-        # called by the storage when the storage is available
+    def prepare(self, new_strategy, delay):
+        # Called when the storage is available: acquires the lock
+        # and performs all the stores up to the method that actually
+        # blocked.  The prepare() method is a wrapper that decides
+        # whether to use another thread.  The real work is done in
+        # _prepare().
         assert isinstance(new_strategy, ImmediateCommitStrategy)
+        if self.log.stores > 25:
+            # If there are a lot of stores, fire off a separate thread
+            # to avoid blocking other clients.
+            return run_in_thread(self._prepare, new_strategy, delay=delay)
+        else:
+            r = self._prepare(new_strategy)
+            if delay is not None:
+                delay.reply(r)
+            else:
+                return r
+        
+    def _prepare(self, new_strategy):
+        # The new strategy will always be an ImmediateCommitStrategy.
         new_strategy.tpc_begin(self.txn, self.tid, self.status)
         loads, loader = self.log.get_loader()
         for i in range(loads):
             oid, serial, data, version = loader.load()
-            new_strategy.store(oid, serial, data, version)
+            if not new_strategy.store(oid, serial, data, version):
+                # Stop processing the log of stores now, because one
+                # has failed.  This transaction will fail when the
+                # client gets to vote.
+                new_strategy.store_failed = 1
+                break
         meth = getattr(new_strategy, self.name)
         return meth(*self.args)
 
@@ -836,8 +880,12 @@
                 time.sleep(howlong)
         self.trigger.close()
 
-def run_in_thread(method, *args):
-    t = SlowMethodThread(method, args)
+def run_in_thread(method, *args, **kw):
+    # support kw only to allow delay keyword arg to be used
+    delay = None
+    if kw:
+        delay = kw["delay"]
+    t = SlowMethodThread(method, args, delay)
     t.start()
     return t.delay
 
@@ -854,11 +902,11 @@
     # avoid blocking, we spawn a separate thread, return an MTDelay()
     # object, and have the thread reply() when it finishes.
 
-    def __init__(self, method, args):
+    def __init__(self, method, args, delay=None):
         threading.Thread.__init__(self)
         self._method = method
         self._args = args
-        self.delay = MTDelay()
+        self.delay = delay or MTDelay()
 
     def run(self):
         try:




More information about the Zope-Checkins mailing list