[Zope-Checkins] CVS: ZODB3/ZEO - StorageServer.py:1.74.2.3

Jeremy Hylton jeremy@zope.com
Wed, 30 Oct 2002 16:42:10 -0500


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv1434/ZEO

Modified Files:
      Tag: ZODB3-3_1-branch
	StorageServer.py 
Log Message:
Backport from trunk: New timeout feature.

Before we do a maintenance release, we should also backport the
configuration option to enable / disable.


=== ZODB3/ZEO/StorageServer.py 1.74.2.2 => 1.74.2.3 ===
--- ZODB3/ZEO/StorageServer.py:1.74.2.2	Fri Oct  4 15:49:53 2002
+++ ZODB3/ZEO/StorageServer.py	Wed Oct 30 16:42:10 2002
@@ -25,6 +25,7 @@
 import os
 import sys
 import threading
+import time
 
 from ZEO import ClientStub
 from ZEO.CommitLog import CommitLog
@@ -210,9 +211,12 @@
         self.storage_id = "uninitialized"
         self.transaction = None
         self.read_only = read_only
+        self.timeout = TimeoutThread()
+        self.timeout.start()
 
     def notifyConnected(self, conn):
         self.client = self.ClientStorageStubClass(conn)
+        self.timeout.notifyConnected(conn)
 
     def notifyDisconnected(self):
         # When this storage closes, we must ensure that it aborts
@@ -222,6 +226,7 @@
             self.abort()
         else:
             self.log("disconnected")
+        self.timeout.notifyDisconnected()
 
     def __repr__(self):
         tid = self.transaction and repr(self.transaction.id)
@@ -393,6 +398,7 @@
         if self.storage._transaction is None:
             self.strategy = self.ImmediateCommitStrategyClass(self.storage,
                                                               self.client)
+            self.timeout.begin()
         else:
             self.strategy = self.DelayedCommitStrategyClass(self.storage,
                                                             self.wait)
@@ -409,6 +415,7 @@
     def tpc_finish(self, id):
         if not self.check_tid(id):
             return
+        self.timeout.end()
         invalidated = self.strategy.tpc_finish()
         if invalidated:
             self.server.invalidate(self, self.storage_id,
@@ -420,6 +427,7 @@
     def tpc_abort(self, id):
         if not self.check_tid(id):
             return
+        self.timeout.end()
         strategy = self.strategy
         strategy.tpc_abort()
         self.transaction = None
@@ -440,7 +448,9 @@
 
     def vote(self, id):
         self.check_tid(id, exc=StorageTransactionError)
-        return self.strategy.tpc_vote()
+        r = self.strategy.tpc_vote()
+        self.timeout.begin()
+        return r
 
     def abortVersion(self, src, id):
         self.check_tid(id, exc=StorageTransactionError)
@@ -736,6 +746,79 @@
             self.delay.error(sys.exc_info())
         else:
             self.delay.reply(result)
+
+class TimeoutThread(threading.Thread):
+    # A TimeoutThread is associated with a ZEOStorage.  It trackes
+    # how long transactions take to commit.  If a transaction takes
+    # too long, it will close the connection.
+
+    TIMEOUT = 30
+
+    def __init__(self):
+        threading.Thread.__init__(self)
+        self._lock = threading.Lock()
+        self._timestamp = None
+        self._conn = None
+
+    def begin(self):
+        self._lock.acquire()
+        try:
+            self._timestamp = time.time()
+        finally:
+            self._lock.release()
+
+    def end(self):
+        self._lock.acquire()
+        try:
+            self._timestamp = None
+        finally:
+            self._lock.release()
+
+    # There's a race here, but I hope it is harmless.
+
+    def notifyConnected(self, conn):
+        self._conn = conn
+
+    def notifyDisconnected(self):
+        self._conn = None
+
+    def run(self):
+        timeout = self.TIMEOUT
+        while self._conn is not None:
+            time.sleep(timeout)
+            
+            self._lock.acquire()
+            try:
+                if self._timestamp is not None:
+                    deadline = self._timestamp + self.TIMEOUT
+                else:
+                    log("TimeoutThread no current transaction",
+                        zLOG.BLATHER)
+                    timeout = self.TIMEOUT
+                    continue
+            finally:
+                self._lock.release()
+                
+            timeout = deadline - time.time()
+            if deadline < time.time():
+                self._abort()
+                break
+            else:
+                elapsed = self.TIMEOUT - timeout
+                log("TimeoutThread transaction has %0.2f sec to complete"
+                    " (%.2f elapsed)" % (timeout, elapsed), zLOG.BLATHER)
+        log("TimeoutThread exiting.  Connection closed.", zLOG.BLATHER)
+
+    def _abort(self):
+        # It's possible for notifyDisconnected to remove the connection
+        # just before we use it.  I think that's harmless, since it means
+        # the connection was closed.
+        log("TimeoutThread aborting transaction", zLOG.WARNING)
+        try:
+            self._conn.close()
+        except AttributeError, msg:
+            log(msg)
+
 
 # Patch up class references
 StorageServer.ZEOStorageClass = ZEOStorage