[Zope-Checkins] CVS: ZODB3/ZEO - StorageServer.py:1.74.2.8 start.py:1.45.6.4
   
    Guido van Rossum
     
    guido@python.org
       
    Mon, 20 Jan 2003 16:54:59 -0500
    
    
  
Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv6796/ZEO
Modified Files:
      Tag: ZODB3-3_1-branch
	StorageServer.py start.py 
Log Message:
Sneak a new feature in the ZODB 3.1.1 release branch: transaction
timeouts.  This is a backport of the same feature in ZODB 3.2.
Why backport a feature?  We absolutely need this for a customer
project that's stuck on ZODB 3.1.x indefinitely; maintaining a brach
off the 3.1.1 release branch indefinitely would be a branching
nightmare.  The feature is not enabled by default, and when not
enabled it doesn't change anything.  So this should be safe.
=== ZODB3/ZEO/StorageServer.py 1.74.2.7 => 1.74.2.8 ===
--- ZODB3/ZEO/StorageServer.py:1.74.2.7	Wed Dec 18 16:11:22 2002
+++ ZODB3/ZEO/StorageServer.py	Mon Jan 20 16:54:26 2003
@@ -24,12 +24,14 @@
 import cPickle
 import os
 import sys
+import time
 import threading
 
 from ZEO import ClientStub
 from ZEO.CommitLog import CommitLog
 from ZEO.zrpc.server import Dispatcher
 from ZEO.zrpc.connection import ManagedServerConnection, Delay, MTDelay
+from ZEO.zrpc.trigger import trigger
 
 import zLOG
 from ZODB.POSException import StorageError, StorageTransactionError
@@ -67,7 +69,8 @@
     ZEOStorageClass = None # patched up later
     ManagedServerConnectionClass = ManagedServerConnection
 
-    def __init__(self, addr, storages, read_only=0):
+    def __init__(self, addr, storages, read_only=0,
+                 transaction_timeout=None):
 
         """StorageServer constructor.
 
@@ -95,6 +98,11 @@
             allowed, even if the storages are writable.  Note that
             pack() is considered a read-only operation.
 
+        transaction_timeout -- The maximum amount of time to wait for
+            a transaction to commit after acquiring the storage lock.
+            If the transaction takes too long, the client connection
+            will be closed and the transaction aborted.
+
         """
 
         self.addr = addr
@@ -112,6 +120,15 @@
         self.dispatcher = self.DispatcherClass(addr,
                                                factory=self.new_connection,
                                                reuse_addr=1)
+        self.timeouts = {}
+        for name in self.storages.keys():
+            if transaction_timeout is None:
+                # An object with no-op methods
+                timeout = StubTimeoutThread()
+            else:
+                timeout = TimeoutThread(transaction_timeout)
+                timeout.start()
+            self.timeouts[name] = timeout
 
     def new_connection(self, sock, addr):
         """Internal: factory to create a new connection.
@@ -139,6 +156,7 @@
         if l is None:
             l = self.connections[storage_id] = []
         l.append(conn)
+        return self.timeouts[storage_id]
 
     def invalidate(self, conn, storage_id, invalidated=(), info=None):
         """Internal: broadcast info and invalidations to clients.
@@ -204,6 +222,7 @@
     ImmediateCommitStrategyClass = None # patched up later
 
     def __init__(self, server, read_only=0):
+        self.timeout = None # Will be initialized in register()
         self.server = server
         self.connection = None
         self.client = None
@@ -211,6 +230,7 @@
         self.storage_id = "uninitialized"
         self.transaction = None
         self.read_only = read_only
+        self.strategy = None
         self.log_label = _label
 
     def notifyConnected(self, conn):
@@ -295,7 +315,7 @@
         self.storage_id = storage_id
         self.storage = storage
         self.setup_delegation()
-        self.server.register_connection(storage_id, self)
+        self.timeout = self.server.register_connection(storage_id, self)
 
     def get_info(self):
         return {'length': len(self.storage),
@@ -439,23 +459,29 @@
         if invalidated:
             self.server.invalidate(self, self.storage_id,
                                    invalidated, self.get_size_info())
-        self.transaction = None
-        self.strategy = None
-        self.handle_waiting()
+        self._clear_transaction()
 
     def tpc_abort(self, id):
         if not self.check_tid(id):
             return
         strategy = self.strategy
         strategy.tpc_abort()
+        self._clear_transaction()
+ 
+    def _clear_transaction(self):
+        # Common code at end of tpc_finish() and tpc_abort()
         self.transaction = None
         self.strategy = None
+        self.timeout.end(self)
+        # handle_waiting() can start another transaction (by
+        # restarting a waiting one) so must be done last
         self.handle_waiting()
 
     def abort(self):
         strategy = self.strategy
         self.transaction = None
         self.strategy = None
+        self.timeout.end(self)
         strategy.abort(self)
 
     # XXX handle new serialnos
@@ -531,9 +557,9 @@
     def restart(self, delay=None):
         old_strategy = self.strategy
         assert isinstance(old_strategy, DelayedCommitStrategy)
-        self.strategy = ImmediateCommitStrategy(self.storage,
-                                                self.client,
-                                                self.log)
+        self.timeout.begin(self)
+        self.strategy = self.ImmediateCommitStrategyClass(
+            self.storage, self.client, self.log)
         resp = old_strategy.restart(self.strategy)
         if delay is not None:
             delay.reply(resp)
@@ -738,6 +764,75 @@
             if z is zeo_storage:
                 del waiting[i]
                 break
+
+class StubTimeoutThread:
+
+    def begin(self, client):
+        pass
+
+    def end(self, client):
+        pass
+
+class TimeoutThread(threading.Thread):
+    """Monitors transaction progress and generates timeouts."""
+
+    # There is one TimeoutThread per storage, because there's one
+    # transaction lock per storage.
+
+    def __init__(self, timeout):
+        threading.Thread.__init__(self)
+        self.setDaemon(1)
+        self._timeout = timeout
+        self._client = None
+        self._deadline = None
+        self._cond = threading.Condition() # Protects _client and _deadline
+        self._trigger = trigger()
+
+    def begin(self, client):
+        # Called from the restart code the "main" thread, whenever the
+        # storage lock is being acquired.  (Serialized by asyncore.)
+        self._cond.acquire()
+        try:
+            assert self._client is None
+            self._client = client
+            self._deadline = time.time() + self._timeout
+            self._cond.notify()
+        finally:
+            self._cond.release()
+
+    def end(self, client):
+        # Called from the "main" thread whenever the storage lock is
+        # being released.  (Serialized by asyncore.)
+        self._cond.acquire()
+        try:
+            if self._client is client:
+                self._client = None
+                self._deadline = None
+        finally:
+            self._cond.release()
+
+    def run(self):
+        # Code running in the thread.
+        while 1:
+            self._cond.acquire()
+            try:
+                while self._deadline is None:
+                    self._cond.wait()
+                howlong = self._deadline - time.time()
+                if howlong <= 0:
+                    # Prevent reporting timeout more than once
+                    self._deadline = None
+                client = self._client # For the howlong <= 0 branch far below
+            finally:
+                self._cond.release()
+            if howlong <= 0:
+                client.log("Transaction timeout after %s seconds" %
+                           self._timeout)
+                self._trigger.pull_trigger(
+                    lambda client=client: client.connection.close())
+            else:
+                time.sleep(howlong)
+        self.trigger.close()
 
 def run_in_thread(method, *args):
     t = SlowMethodThread(method, args)
=== ZODB3/ZEO/start.py 1.45.6.3 => 1.45.6.4 ===
--- ZODB3/ZEO/start.py:1.45.6.3	Fri Dec 20 15:04:01 2002
+++ ZODB3/ZEO/start.py	Mon Jan 20 16:54:26 2003
@@ -152,6 +152,8 @@
             attr_name -- This is the name to which the storage object
               is assigned in the module.
 
+       -t timeout -- transaction timeout in seconds (default no timeout)
+
        -P file -- Run under profile and dump output to file.  Implies the
           -s flag.
 
@@ -159,7 +161,7 @@
     """ % (me, env.fs)
 
     try:
-        opts, args = getopt.getopt(args, 'p:Dh:U:sS:u:P:d')
+        opts, args = getopt.getopt(args, 'p:Dh:U:sS:t:u:P:d')
     except getopt.error, msg:
         print usage
         print msg
@@ -174,6 +176,7 @@
     prof = None
     detailed = 0
     fs = None
+    transaction_timeout = None
     for o, v in opts:
         if o =='-p':
             port = int(v)
@@ -189,6 +192,8 @@
             detailed = 1
         elif o =='-s':
             Z = 0
+        elif o =='-t':
+            transaction_timeout = float(v)
         elif o =='-P':
             prof = v
 
@@ -263,7 +268,8 @@
         if not unix:
             unix = host, port
 
-        ZEO.StorageServer.StorageServer(unix, storages)
+        ZEO.StorageServer.StorageServer(
+            unix, storages, transaction_timeout=transaction_timeout)
 
         if not Z:
             try: