[Zodb-checkins] CVS: ZODB3/ZEO - ClientStorage.py:1.73.2.31 ClientStub.py:1.10.2.2 StorageServer.py:1.74.2.12

Jeremy Hylton jeremy at zope.com
Wed Aug 27 01:38:52 EDT 2003


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv27274/ZEO

Modified Files:
      Tag: ZODB3-3_1-branch
	ClientStorage.py ClientStub.py StorageServer.py 
Log Message:
New design for the vote_nb() method used to vote without blocking.

Instead of getting a message that tells the client to try again later,
the client will get a callback when it is that client's turn to try
again.  The callback is integrated with the queue used for blocked
transactions, i.e. for clients that use the vote() call instead of
vote_nb().  The primary advantage of the new design is fairness:  Each
client's transaction is handled in the order in which the vote calls
are received.

The implementation still has two rough edges mentioned in the XXX
comments in tpc_vote().

It passes all the unit tests on Linux.


=== ZODB3/ZEO/ClientStorage.py 1.73.2.30 => 1.73.2.31 ===
--- ZODB3/ZEO/ClientStorage.py:1.73.2.30	Tue Aug 26 13:53:02 2003
+++ ZODB3/ZEO/ClientStorage.py	Wed Aug 27 00:38:51 2003
@@ -219,6 +219,7 @@
         self._connection = None
         self._pending_server = None
         self._ready = threading.Event()
+        self._vote_ready = threading.Event()
 
         # _is_read_only stores the constructor argument
         self._is_read_only = read_only
@@ -728,20 +729,25 @@
         from random import random
         if transaction is not self._transaction:
             return
-        max_backoff = 0.5   # seconds
-        total_backoff = 0.0
-        while 1:
-            if self._server.vote_nb(self._serial) == "VOTE_BACKOFF":
-                if total_backoff >= 3600.0: # waited an hour already
-                    raise StorageSystemError("Timed out waiting to vote")
-                backoff = random() * max_backoff
-                time.sleep(backoff)
-                total_backoff += backoff
-                max_backoff = min(max_backoff * 1.5, 60)
-            else:
-                break
+        # XXX deal with disconnection while waiting?
+        if self._server.vote_nb(self._serial) == "VOTE_WAIT":
+            deadline = time.time() + 3600
+            while not self._vote_ready.isSet():
+                log2(BLATHER, "Waiting for storage to call voteReady")
+                remaining = deadline - time.time()
+                if remaining <= 0:
+                    raise StorageTransactionError("Timed out waiting to vote")
+                self._connection.pending(remaining)
+            log2(BLATHER, "Received voteReady notification from server")
+            # XXX If the client would fail to call vote, the server would
+            # block until it timed the connectio out.  Is that a likely
+            # problem?
+            self._server.vote(self._serial)
         return self._check_serials()
 
+    def voteReady(self):
+        self._vote_ready.set()
+
     def tpc_begin(self, transaction, tid=None, status=' '):
         """Storage API: begin a transaction."""
         if self._is_read_only:
@@ -758,6 +764,7 @@
             self._tpc_cond.wait(30)
         self._transaction = transaction
         self._tpc_cond.release()
+        self._vote_ready.clear()
 
         if tid is None:
             self._ts = get_timestamp(self._ts)


=== ZODB3/ZEO/ClientStub.py 1.10.2.1 => 1.10.2.2 ===
--- ZODB3/ZEO/ClientStub.py:1.10.2.1	Thu Jun  5 18:42:36 2003
+++ ZODB3/ZEO/ClientStub.py	Wed Aug 27 00:38:51 2003
@@ -59,6 +59,9 @@
         # mainloop and do the I/O when all the invalidations are sent.
         self.rpc.callAsyncNoPoll('Invalidate', args)
 
+    def voteReady(self):
+        self.rpc.callAsync('voteReady')
+
     def serialnos(self, arg):
         self.rpc.callAsync('serialnos', arg)
 


=== ZODB3/ZEO/StorageServer.py 1.74.2.11 => 1.74.2.12 ===
--- ZODB3/ZEO/StorageServer.py:1.74.2.11	Fri Aug 22 16:45:28 2003
+++ ZODB3/ZEO/StorageServer.py	Wed Aug 27 00:38:51 2003
@@ -213,6 +213,45 @@
             if conn.obj in cl:
                 cl.remove(conn.obj)
 
+# Each storage has a waiting queue.  The queue contains two kinds of
+# entries: blocked and non-blocking entries.  A block entry is made by
+# a client that is expecting a response to its vote() before
+# continuing.  A non-blocking entry expects is waiting for an
+# asynchronous message to tell it to continue.  The restart() method
+# of an entry will restart it appropriately.
+
+class WaitQueueEntry:
+    def __init__(self, delay, client):
+        self.delay = delay
+        self.client = client
+
+    def restart(self, other):
+        try:
+            self.client.restart(self.delay)
+        except:
+            other.log("Unexpected error handling waiting transaction",
+                      level=zLOG.WARNING, error=sys.exc_info())
+            self.client.connection.close()
+            return 0
+        else:
+            return 1
+
+class NBQueueEntry:
+    def __init__(self, client):
+        self.client = client
+
+    def restart(self, other):
+        other.log("Calling voteRready")
+        try:
+            self.client.voteReady()
+        except:
+            other.log("Unexpected error handling queued transaction",
+                      level=zLOG.WARNING, error=sys.exc_info())
+            self.client.connection.close()
+            return 0
+        else:
+            return 1
+
 class ZEOStorage:
     """Proxy to underlying storage for a single remote client."""
 
@@ -235,7 +274,7 @@
         self.log_label = _label
 
     def notifyConnected(self, conn):
-        self.connection = conn # For restart_other() below
+        self.connection = conn # For WaitQueueEntry objects
         self.client = self.ClientStorageStubClass(conn)
         addr = conn.addr
         if isinstance(addr, type("")):
@@ -443,7 +482,8 @@
 
         # (This doesn't require a lock because we're using asyncore)
         self.strategy = self.DelayedCommitStrategyClass(self.storage,
-                                                        self.wait)
+                                                        self.wait,
+                                                        self.queue)
 
         t = Transaction()
         t.id = id
@@ -528,50 +568,44 @@
     def wait(self):
         if self.storage._transaction:
             d = Delay()
-            self.storage._waiting.append((d, self))
+            self.storage._waiting.append(WaitQueueEntry(d, self))
             self.log("Transaction blocked waiting for storage. "
                      "Clients waiting: %d." % len(self.storage._waiting))
             return d
         else:
             return self.restart()
 
+    def queue(self):
+        if self.storage._transaction:
+            self.storage._waiting.append(NBQueueEntry(self.client))
+            self.log("Transaction queued waiting for storage. "
+                     "Clients waiting: %d." % len(self.storage._waiting))
+
     def dontwait(self):
         return self.restart()
 
     def handle_waiting(self):
         while self.storage._waiting:
-            delay, zeo_storage = self.storage._waiting.pop(0)
-            if self.restart_other(zeo_storage, delay):
+            entry = self.storage._waiting.pop(0)
+            if entry.restart(self):
                 if self.storage._waiting:
                     n = len(self.storage._waiting)
-                    self.log("Blocked transaction restarted.  "
+                    self.log("Transaction restarted.  "
                              "Clients waiting: %d" % n)
                 else:
-                    self.log("Blocked transaction restarted.")
+                    self.log("Transaction restarted.")
                 return
 
-    def restart_other(self, zeo_storage, delay):
-        # Return True if the server restarted.
-        # call the restart() method on the appropriate server.
-        try:
-            zeo_storage.restart(delay)
-        except:
-            self.log("Unexpected error handling waiting transaction",
-                     level=zLOG.WARNING, error=sys.exc_info())
-            zeo_storage.connection.close()
-            return 0
-        else:
-            return 1
-
     def restart(self, delay=None):
+        # Create a new ImmediateCommitStrategy and prepare it.
         old_strategy = self.strategy
         assert isinstance(old_strategy, DelayedCommitStrategy)
         self.timeout.begin(self)
         self.strategy = self.ImmediateCommitStrategyClass(
             self.storage, self.client, self.log)
         resp = old_strategy.prepare(self.strategy, delay)
-        # If delay is None, then we were called from restart_other()
-        # and the return value doesn't matter.
+        # If delay is None, then we were called from the restart()
+        # method of a WaitQueueEntry and the return value doesn't matter.
         return resp
 
 # A ZEOStorage instance can use different strategies to commit a
@@ -714,10 +748,11 @@
 class DelayedCommitStrategy:
     """The storage is unavailable, so log to a file."""
 
-    def __init__(self, storage, block):
+    def __init__(self, storage, block, queue):
         # the block argument is called when we can't delay any longer
         self.storage = storage
         self.block = block
+        self.queue = queue
         self.log = CommitLog()
 
         # Store information about the call that blocks
@@ -750,7 +785,8 @@
 
     def tpc_vote_nb(self):
         if self.storage._transaction:
-            return "VOTE_BACKOFF"
+            self.queue()
+            return "VOTE_WAIT"
         else:
             return self.tpc_vote()
 




More information about the Zodb-checkins mailing list