[Zodb-checkins] CVS: StandaloneZODB/ZEO - ClientCache.py:1.22 ClientStorage.py:1.39 StorageServer.py:1.35 asyncwrap.py:1.4 smac.py:1.15 start.py:1.30 zrpc.py:1.22

Jeremy Hylton jeremy@zope.com
Fri, 15 Mar 2002 00:11:53 -0500


Update of /cvs-repository/StandaloneZODB/ZEO
In directory cvs.zope.org:/tmp/cvs-serv27595

Modified Files:
	ClientCache.py ClientStorage.py StorageServer.py asyncwrap.py 
	smac.py start.py zrpc.py 
Log Message:
Merge changes from the zeo-1_0-branch onto the debug branch


=== StandaloneZODB/ZEO/ClientCache.py 1.21 => 1.22 ===


=== StandaloneZODB/ZEO/ClientStorage.py 1.38 => 1.39 ===
 """Network ZODB storage client
 """
-
 __version__='$Revision$'[11:-2]
 
 import struct, time, os, socket, string, Sync, zrpc, ClientCache


=== StandaloneZODB/ZEO/StorageServer.py 1.34 => 1.35 ===
-# 
-# Zope Public License (ZPL) Version 1.0
-# -------------------------------------
-# 
-# Copyright (c) Digital Creations.  All rights reserved.
-# 
-# This license has been certified as Open Source(tm).
-# 
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-# 
-# 1. Redistributions in source code must retain the above copyright
-#    notice, this list of conditions, and the following disclaimer.
-# 
-# 2. Redistributions in binary form must reproduce the above copyright
-#    notice, this list of conditions, and the following disclaimer in
-#    the documentation and/or other materials provided with the
-#    distribution.
-# 
-# 3. Digital Creations requests that attribution be given to Zope
-#    in any manner possible. Zope includes a "Powered by Zope"
-#    button that is installed by default. While it is not a license
-#    violation to remove this button, it is requested that the
-#    attribution remain. A significant investment has been put
-#    into Zope, and this effort will continue if the Zope community
-#    continues to grow. This is one way to assure that growth.
-# 
-# 4. All advertising materials and documentation mentioning
-#    features derived from or use of this software must display
-#    the following acknowledgement:
-# 
-#      "This product includes software developed by Digital Creations
-#      for use in the Z Object Publishing Environment
-#      (http://www.zope.org/)."
-# 
-#    In the event that the product being advertised includes an
-#    intact Zope distribution (with copyright and license included)
-#    then this clause is waived.
-# 
-# 5. Names associated with Zope or Digital Creations must not be used to
-#    endorse or promote products derived from this software without
-#    prior written permission from Digital Creations.
-# 
-# 6. Modified redistributions of any form whatsoever must retain
-#    the following acknowledgment:
-# 
-#      "This product includes software developed by Digital Creations
-#      for use in the Z Object Publishing Environment
-#      (http://www.zope.org/)."
-# 
-#    Intact (re-)distributions of any official Zope release do not
-#    require an external acknowledgement.
-# 
-# 7. Modifications are encouraged but must be packaged separately as
-#    patches to official Zope releases.  Distributions that do not
-#    clearly separate the patches from the original work must be clearly
-#    labeled as unofficial distributions.  Modifications which do not
-#    carry the name Zope may be packaged in any form, as long as they
-#    conform to all of the clauses above.
-# 
-# 
-# Disclaimer
-# 
-#   THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
-#   EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-#   IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-#   PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
-#   CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
-#   USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
-#   ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
-#   OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
-#   OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-#   SUCH DAMAGE.
-# 
-# 
-# This software consists of contributions made by Digital Creations and
-# many individuals on behalf of Digital Creations.  Specific
-# attributions are listed in the accompanying credits file.
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
 # 
 ##############################################################################
 
@@ -99,6 +28,7 @@
 from cStringIO import StringIO
 from ZEO import trigger
 from ZEO import asyncwrap
+from ZEO.smac import Disconnected
 from types import StringType
 
 class StorageServerError(POSException.StorageError): pass
@@ -133,6 +63,8 @@
         self.__storages=storages
         for n, s in storages.items():
             init_storage(s)
+            # Create a waiting list to support the distributed commit lock.
+            s._waiting = []
 
         self.__connections={}
         self.__get_connections=self.__connections.get
@@ -280,6 +212,7 @@
             # This is the first communication from the client
             self.__storage, self.__storage_id = (
                 self.__server.register_connection(self, message))
+
             # Send info back asynchronously, so client need not ask
             self.message_output('S'+dump(self.get_info(), 1))
             return
@@ -501,39 +434,76 @@
             return oids
         return ()
 
-    def tpc_abort(self, id):
-        t=self._transaction
-        if t is None or id != t.id: return
-        r=self.__storage.tpc_abort(t)
+    # distributed commit lock support methods
 
-        storage=self.__storage
-        try: waiting=storage.__waiting
-        except: waiting=storage.__waiting=[]
+    # Only one client at a time can commit a transaction on a
+    # storage.  If one client is committing a transaction, and a
+    # second client sends a tpc_begin(), then second client is queued.
+    # When the first transaction finishes, either by abort or commit,
+    # the request from the queued client must be handled.
+
+    # It is important that this code be robust.  If a queued
+    # transaction is not restarted, the server will stop processing
+    # new transactions.
+
+    # This lock is implemented by storing the queued requests in a
+    # list on the storage object.  The list contains:
+    #     a callable object to resume request
+    #     arguments to that object
+    #     a callable object to handle errors during resume
+
+    # XXX I am not sure that the commitlock_resume() method is
+    # sufficiently paranoid.
+
+    def commitlock_suspend(self, resume, args, onerror):
+        self.__storage._waiting.append((resume, args, onerror))
+
+    def commitlock_resume(self):
+        waiting = self.__storage._waiting
         while waiting:
-            f, args = waiting.pop(0)
-            if apply(f,args): break
+            resume, args, onerror = waiting.pop(0)
+            try:
+                if apply(resume, args):
+                    break
+            except Disconnected:
+                # A disconnected error isn't an unexpected error.
+                # There should be no need to log it, because the
+                # disconnect will have generated its own log event.
+                onerror()
+            except:
+                LOG('ZEO Server', ERROR,
+                    "Unexpected error handling queued tpc_begin()",
+                    error=sys.exc_info())
+                onerror()
 
-        self._transaction=None
-        self.__invalidated=[]
+    def tpc_abort(self, id):
+        t = self._transaction
+        if t is None or id != t.id:
+            return
+        r = self.__storage.tpc_abort(t)
+
+        self._transaction = None
+        self.__invalidated = []
+        self.commitlock_resume()
         
     def unlock(self):
-        if self.__closed: return
+        if self.__closed:
+            return
         self.message_output('UN.')
 
     def tpc_begin(self, id, user, description, ext):
-        t=self._transaction
+        t = self._transaction
         if t is not None:
-            if id == t.id: return
+            if id == t.id:
+                return
             else:
                 raise StorageServerError(
                     "Multiple simultaneous tpc_begin requests from the same "
                     "client."
                     )
-        storage=self.__storage
+        storage = self.__storage
         if storage._transaction is not None:
-            try: waiting=storage.__waiting
-            except: waiting=storage.__waiting=[]
-            waiting.append((self.unlock, ()))
+            self.commitlock_suspend(self.unlock, (), self.close)
             return 1 # Return a flag indicating a lock condition.
             
         self._transaction=t=Transaction()
@@ -552,9 +522,9 @@
         if storage._transaction is None:
             self.try_again_sync(id, user, description, ext)
         else:
-            try: waiting=storage.__waiting
-            except: waiting=storage.__waiting=[]
-            waiting.append((self.try_again_sync, (id, user, description, ext)))
+            self.commitlock_suspend(self.try_again_sync,
+                                    (id, user, description, ext),
+                                    self.close)
 
         return _noreturn
         
@@ -572,24 +542,21 @@
         return 1
 
     def tpc_finish(self, id, user, description, ext):
-        t=self._transaction
-        if id != t.id: return
+        t = self._transaction
+        if id != t.id:
+            return
 
-        storage=self.__storage
-        r=storage.tpc_finish(t)
-        
-        try: waiting=storage.__waiting
-        except: waiting=storage.__waiting=[]
-        while waiting:
-            f, args = waiting.pop(0)
-            if apply(f,args): break
+        storage = self.__storage
+        r = storage.tpc_finish(t)
 
-        self._transaction=None
+        self._transaction = None
         if self.__invalidated:
             self.__server.invalidate(self, self.__storage_id,
                                      self.__invalidated,
                                      self.get_size_info())
-            self.__invalidated=[]
+            self.__invalidated = []
+            
+        self.commitlock_resume()
 
 def init_storage(storage):
     if not hasattr(storage,'tpc_vote'): storage.tpc_vote=lambda *args: None


=== StandaloneZODB/ZEO/asyncwrap.py 1.3 => 1.4 ===
 # 
 ##############################################################################
-
 """A wrapper for asyncore that provides robust exception handling.
 
 The poll() and loop() calls exported by asyncore can raise exceptions.
@@ -31,6 +30,10 @@
 it would be useful to extend this module with wrappers for those
 errors.
 """
+
+# XXX The current implementation requires Python 2.0.  Not sure if
+# that's acceptable, depends on how many users want to combine ZEO 1.0
+# and Zope 2.3.
 
 import asyncore
 import errno


=== StandaloneZODB/ZEO/smac.py 1.14 => 1.15 ===


=== StandaloneZODB/ZEO/start.py 1.29 => 1.30 ===
     else:
         # Hm, lets at least try to take care of the stupid logger:
-        if hasattr(zLOG, '_set_stupid_dest'):
-            zLOG._set_stupid_dest(None)
-        else:
-            zLOG._stupid_dest = None
+        zLOG._stupid_dest=None
 
 def rotate_logs_handler(signum, frame):
     rotate_logs()


=== StandaloneZODB/ZEO/zrpc.py 1.21 => 1.22 ===