[Zope-Checkins] CVS: ZODB3/ZODB - Transaction.py:1.39.2.1 DB.py:1.43.8.1 Connection.py:1.76.4.2 ConflictResolution.py:1.14.2.2 BaseStorage.py:1.24.2.2

Jeremy Hylton jeremy@zope.com
Tue, 12 Nov 2002 15:18:40 -0500


Update of /cvs-repository/ZODB3/ZODB
In directory cvs.zope.org:/tmp/cvs-serv20159/ZODB

Modified Files:
      Tag: ZODB3-3_1-branch
	Transaction.py DB.py Connection.py ConflictResolution.py 
	BaseStorage.py 
Log Message:
Merge the deadlock debug code to the release branch.


=== ZODB3/ZODB/Transaction.py 1.39 => 1.39.2.1 ===
--- ZODB3/ZODB/Transaction.py:1.39	Fri Sep 27 14:37:24 2002
+++ ZODB3/ZODB/Transaction.py	Tue Nov 12 15:18:09 2002
@@ -19,12 +19,34 @@
 import time, sys, struct, POSException
 from struct import pack
 from string import split, strip, join
-from zLOG import LOG, ERROR, PANIC
+from zLOG import LOG, ERROR, PANIC, INFO, BLATHER, WARNING
 from POSException import ConflictError
+from ZODB import utils
 
 # Flag indicating whether certain errors have occurred.
 hosed=0
 
+# There is an order imposed on all jars, based on the storages they
+# serve, that must be consistent across all applications using the
+# storages.  The order is defined by the sortKey() method of the jar.
+
+def jar_cmp(j1, j2):
+    # Call sortKey() every time, because a ZEO client could reconnect
+    # to a different server at any time.
+    try:
+        k1 = j1.sortKey()
+    except:
+        LOG("TM", WARNING, "jar missing sortKey() method: %s" % j1)
+        k1 = id(j1)
+
+    try:
+        k2 = j2.sortKey()
+    except:
+        LOG("TM", WARNING, "jar missing sortKey() method: %s" % j2)
+        k2 = id(j2)
+        
+    return cmp(k1, k2)
+
 class Transaction:
     'Simple transaction objects for single-threaded applications.'
     user=''
@@ -53,6 +75,9 @@
             for c in self._connections.values(): c.close()
             del self._connections
 
+    def log(self, msg, level=INFO, error=None):
+        LOG("TM:%s" % self._id, level, msg, error=error)
+
     def sub(self):
         # Create a manually managed subtransaction for internal use
         r=self.__class__()
@@ -84,11 +109,8 @@
                 """)
 
         t = None
-        subj = self._sub
-        subjars = ()
 
         if not subtransaction:
-
             # Must add in any non-subtransaction supporting objects that
             # may have been stowed away from previous subtransaction
             # commits.
@@ -96,11 +118,14 @@
                 self._objects.extend(self._non_st_objects)
                 self._non_st_objects = None
 
-            if subj is not None:
+            if self._sub is not None:
                 # Abort of top-level transaction after commiting
                 # subtransactions.
-                subjars = subj.values()
+                subjars = self._sub.values()
+                subjars.sort(jar_cmp)
                 self._sub = None
+            else:
+                subjars = []
 
         try:
             # Abort the objects
@@ -110,13 +135,20 @@
                     if j is not None:
                         j.abort(o, self)
                 except:
+                    # Record the first exception that occurred
                     if t is None:
                         t, v, tb = sys.exc_info()
+                    else:
+                        self.log("Failed to abort object %016x" %
+                                 utils.U64(o._p_oid), error=sys.exc_info())
 
-            # Ugh, we need to abort work done in sub-transactions.
-            while subjars:
-                j = subjars.pop()
-                j.abort_sub(self) # This should never fail
+            # tpc_begin() was never called, so tpc_abort() should not be
+            # called.
+
+            if not subtransaction:
+                # abort_sub() must be called to clear subtransaction state
+                for jar in subjars:
+                    jar.abort_sub(self) # This should never fail
 
             if t is not None:
                 raise t, v, tb
@@ -136,7 +168,8 @@
 
         This aborts any transaction in progres.
         '''
-        if self._objects: self.abort(subtransaction, 0)
+        if self._objects:
+            self.abort(subtransaction, 0)
         if info:
             info=split(info,'\t')
             self.user=strip(info[0])
@@ -146,30 +179,32 @@
         'Finalize the transaction'
 
         objects = self._objects
-        jars = {}
-        jarsv = None
-        subj = self._sub
-        subjars = ()
 
+        subjars = []
         if subtransaction:
-            if subj is None:
-                self._sub = subj = {}
+            if self._sub is None:
+                # Must store state across multiple subtransactions
+                # so that the final commit can commit all subjars.
+                self._sub = {}
         else:
-            if subj is not None:
+            if self._sub is not None:
+                # This commit is for a top-level transaction that
+                # has previously committed subtransactions.  Do
+                # one last subtransaction commit to clear out the
+                # current objects, then commit all the subjars.
                 if objects:
-                    # Do an implicit sub-transaction commit:
                     self.commit(1)
-                    # XXX What does this do?
                     objects = []
-                subjars = subj.values()
+                subjars = self._sub.values()
+                subjars.sort(jar_cmp)
                 self._sub = None
 
-        # If not a subtransaction, then we need to add any non-
-        # subtransaction-supporting objects that may have been
-        # stowed away during subtransaction commits to _objects.
-        if (subtransaction is None) and (self._non_st_objects is not None):
-            objects.extend(self._non_st_objects)
-            self._non_st_objects = None
+                # If there were any non-subtransaction-aware jars
+                # involved in earlier subtransaction commits, we need
+                # to add them to the list of jars to commit.
+                if self._non_st_objects is not None:
+                    objects.extend(self._non_st_objects)
+                    self._non_st_objects = None
 
         if (objects or subjars) and hosed:
             # Something really bad happened and we don't
@@ -188,88 +223,140 @@
         #   either call tpc_abort or tpc_finish. It is OK to call
         #   these multiple times, as the storage is required to ignore
         #   these calls if tpc_begin has not been called.
+        #
+        # - That we call tpc_begin() in a globally consistent order,
+        #   so that concurrent transactions involving multiple storages
+        #   do not deadlock.
         try:
             ncommitted = 0
+            jars = self._get_jars(objects, subtransaction)
             try:
-                ncommitted += self._commit_objects(objects, jars,
-                                                   subtransaction, subj)
-
-                self._commit_subtrans(jars, subjars)
-
-                jarsv = jars.values()
-                for jar in jarsv:
-                    if not subtransaction:
+                # If not subtransaction, then jars will be modified.
+                self._commit_begin(jars, subjars, subtransaction)
+                ncommitted += self._commit_objects(objects)
+                if not subtransaction:
+                    # Unless this is a really old jar that doesn't
+                    # implement tpc_vote(), it must raise an exception
+                    # if it can't commit the transaction.
+                    for jar in jars:
                         try:
                             vote = jar.tpc_vote
-                        except:
+                        except AttributeError:
                             pass
                         else:
-                            vote(self) # last chance to bail
+                            vote(self)
 
                 # Handle multiple jars separately.  If there are
                 # multiple jars and one fails during the finish, we
                 # mark this transaction manager as hosed.
-                if len(jarsv) == 1:
-                    self._finish_one(jarsv[0])
+                if len(jars) == 1:
+                    self._finish_one(jars[0])
                 else:
-                    self._finish_many(jarsv)
+                    self._finish_many(jars)
             except:
                 # Ugh, we got an got an error during commit, so we
-                # have to clean up.
-                exc_info = sys.exc_info()
-                if jarsv is None:
-                    jarsv = jars.values()
-                self._commit_error(exc_info, objects, ncommitted,
-                                   jarsv, subjars)
+                # have to clean up.  First save the original exception
+                # in case the cleanup process causes another
+                # exception.
+                t, v, tb = sys.exc_info()
+                try:
+                    self._commit_error(objects, ncommitted, jars, subjars)
+                except:
+                    LOG('ZODB', ERROR,
+                        "A storage error occured during transaction "
+                        "abort.  This shouldn't happen.",
+                        error=sys.exc_info())
+                    
+                raise t, v, tb
         finally:
             del objects[:] # clear registered
             if not subtransaction and self._id is not None:
                 free_transaction()
 
-    def _commit_objects(self, objects, jars, subtransaction, subj):
-        # commit objects and return number of commits
-        ncommitted = 0
+    def _get_jars(self, objects, subtransaction):
+        # Returns a list of jars for this transaction.
+        
+        # Find all the jars and sort them in a globally consistent order.
+        # objects is a list of persistent objects and jars.
+        # If this is a subtransaction and a jar is not subtransaction aware,
+        # it's object gets delayed until the parent transaction commits.
+        
+        d = {}
         for o in objects:
-            j = getattr(o, '_p_jar', o)
-            if j is not None:
-                i = id(j)
-                if not jars.has_key(i):
-                    jars[i] = j
-
-                    if subtransaction:
-                        # If a jar does not support subtransactions,
-                        # we need to save it away to be committed in
-                        # the outer transaction.
-                        try:
-                            j.tpc_begin(self, subtransaction)
-                        except TypeError:
-                            j.tpc_begin(self)
+            jar = getattr(o, '_p_jar', o)
+            if jar is None:
+                # I don't think this should ever happen, but can't
+                # prove that it won't.  If there is no jar, there
+                # is nothing to be done.
+                self.log("Object with no jar registered for transaction: "
+                         "%s" % repr(o), level=BLATHER)
+                continue
+            # jar may not be safe as a dictionary key
+            key = id(jar)
+            d[key] = jar
+
+            if subtransaction:
+                if hasattr(jar, "commit_sub"):
+                    self._sub[key] = jar
+                else:
+                    if self._non_st_objects is None:
+                        self._non_st_objects = []
+                    self._non_st_objects.append(o)
+                
+        jars = d.values()
+        jars.sort(jar_cmp)
 
-                        if hasattr(j, 'commit_sub'):
-                            subj[i] = j
-                        else:
-                            if self._non_st_objects is None:
-                                self._non_st_objects = []
-                            self._non_st_objects.append(o)
-                            continue
-                    else:
-                        j.tpc_begin(self)
-                j.commit(o, self)
+        return jars
+
+    def _commit_begin(self, jars, subjars, subtransaction):
+        if subtransaction:
+            assert not subjars
+            for jar in jars:
+                try:
+                    jar.tpc_begin(self, subtransaction)
+                except TypeError:
+                    # Assume that TypeError means that tpc_begin() only
+                    # takes one argument, and that the jar doesn't
+                    # support subtransactions.
+                    jar.tpc_begin(self)
+        else:
+            # Merge in all the jars used by one of the subtransactions.
+
+            # When the top-level subtransaction commits, the tm must
+            # call commit_sub() for each jar involved in one of the
+            # subtransactions.  The commit_sub() method should call
+            # tpc_begin() on the storage object.
+
+            # It must also call tpc_begin() on jars that were used in
+            # a subtransaction but don't support subtransactions.
+
+            # These operations must be performed on the jars in order.
+
+            # Modify jars inplace to include the subjars, too.
+            jars += subjars
+            jars.sort(jar_cmp)
+            # assume that subjars is small, so that it's cheaper to test
+            # whether jar in subjars than to make a dict and do has_key.
+            for jar in jars:
+                if jar in subjars:
+                    jar.commit_sub(self)
+                else:
+                    jar.tpc_begin(self)
+
+    def _commit_objects(self, objects):
+        ncommitted = 0
+        for o in objects:
+            jar = getattr(o, "_p_jar", o)
+            if jar is None:
+                continue
+            jar.commit(o, self)
             ncommitted += 1
         return ncommitted
 
-    def _commit_subtrans(self, jars, subjars):
-        # Commit work done in subtransactions
-        while subjars:
-            j = subjars.pop()
-            i = id(j)
-            if not jars.has_key(i):
-                jars[i] = j
-            j.commit_sub(self)
-
     def _finish_one(self, jar):
         try:
-            jar.tpc_finish(self) # This should never fail
+            # The database can't guarantee consistency if call fails.
+            jar.tpc_finish(self)
         except:
             # Bug if it does, we need to keep track of it
             LOG('ZODB', ERROR,
@@ -278,42 +365,40 @@
                 error=sys.exc_info())
             raise
 
-    def _finish_many(self, jarsv):
+    def _finish_many(self, jars):
         global hosed
         try:
-            while jarsv:
-                jarsv[-1].tpc_finish(self) # This should never fail
-                jarsv.pop() # It didn't, so it's taken care of.
+            for jar in jars:
+                # The database can't guarantee consistency if call fails.
+                jar.tpc_finish(self)
         except:
-            # Bug if it does, we need to yell FIRE!
-            # Someone finished, so don't allow any more
-            # work without at least a restart!
             hosed = 1
             LOG('ZODB', PANIC,
                 "A storage error occurred in the last phase of a "
                 "two-phase commit.  This shouldn\'t happen. "
-                "The application may be in a hosed state, so "
-                "transactions will not be allowed to commit "
+                "The application will not be allowed to commit "
                 "until the site/storage is reset by a restart. ",
                 error=sys.exc_info())
             raise
 
-    def _commit_error(self, (t, v, tb),
-                      objects, ncommitted, jarsv, subjars):
-        # handle an exception raised during commit
-        # takes sys.exc_info() as argument
-
-        # First, we have to abort any uncommitted objects.
+    def _commit_error(self, objects, ncommitted, jars, subjars):
+        # First, we have to abort any uncommitted objects.  The abort
+        # will mark the object for invalidation, so that it's last
+        # committed state will be restored.
         for o in objects[ncommitted:]:
             try:
                 j = getattr(o, '_p_jar', o)
                 if j is not None:
                     j.abort(o, self)
             except:
-                pass
-
-        # Then, we unwind TPC for the jars that began it.
-        for j in jarsv:
+                # nothing to do but log the error
+                self.log("Failed to abort object %016x" % utils.U64(o._p_oid),
+                         error=sys.exc_info())
+
+        # Abort the two-phase commit.  It's only necessary to abort the
+        # commit for jars that began it, but it is harmless to abort it
+        # for all.
+        for j in jars:
             try:
                 j.tpc_abort(self) # This should never fail
             except:
@@ -321,9 +406,14 @@
                     "A storage error occured during object abort. This "
                     "shouldn't happen. ", error=sys.exc_info())
 
-        # Ugh, we need to abort work done in sub-transactions.
-        while subjars:
-            j = subjars.pop()
+        # After the tpc_abort(), call abort_sub() on all the
+        # subtrans-aware jars to *really* abort the subtransaction.
+        
+        # Example: For Connection(), the tpc_abort() will abort the
+        # subtransaction TmpStore() and abort_sub() will remove the
+        # TmpStore.
+
+        for j in subjars:
             try:
                 j.abort_sub(self) # This should never fail
             except:
@@ -332,8 +422,6 @@
                     "object abort.  This shouldn't happen.",
                     error=sys.exc_info())
 
-        raise t, v, tb
-
     def register(self,object):
         'Register the given object for transaction control.'
         self._append(object)
@@ -365,8 +453,6 @@
 the system problem.  See your application log for
 information on the error that lead to this problem.
 """
-
-
 
 ############################################################################
 # install get_transaction:


=== ZODB3/ZODB/DB.py 1.43 => 1.43.8.1 ===
--- ZODB3/ZODB/DB.py:1.43	Wed Aug 14 18:07:09 2002
+++ ZODB3/ZODB/DB.py	Tue Nov 12 15:18:09 2002
@@ -577,7 +577,11 @@
         self.tpc_begin=s.tpc_begin
         self.tpc_vote=s.tpc_vote
         self.tpc_finish=s.tpc_finish
+        self._sortKey=s.sortKey
         get_transaction().register(self)
+
+    def sortKey(self):
+        return "%s:%s" % (self._sortKey(), id(self))
 
     def abort(self, reallyme, t): pass
 


=== ZODB3/ZODB/Connection.py 1.76.4.1 => 1.76.4.2 ===
--- ZODB3/ZODB/Connection.py:1.76.4.1	Fri Oct  4 20:28:13 2002
+++ ZODB3/ZODB/Connection.py	Tue Nov 12 15:18:09 2002
@@ -191,6 +191,14 @@
             return obj
         return self[oid]
 
+    def sortKey(self):
+        # XXX will raise an exception if the DB hasn't been set
+        storage_key = self._sortKey()
+        # If two connections use the same storage, give them a
+        # consistent order using id().  This is unique for the
+        # lifetime of a connection, which is good enough.
+        return "%s:%s" % (storage_key, id(self))
+
     def _setDB(self, odb):
         """Begin a new transaction.
 
@@ -198,6 +206,7 @@
         """
         self._db=odb
         self._storage=s=odb._storage
+        self._sortKey = odb._storage.sortKey
         self.new_oid=s.new_oid
         if self._code_timestamp != global_code_timestamp:
             # New code is in place.  Start a new cache.
@@ -268,27 +277,8 @@
         self.__onCommitActions.append((method_name, args, kw))
         get_transaction().register(self)
 
-    # NB: commit() is responsible for calling tpc_begin() on the storage.
-    # It uses self._begun to track whether it has been called.  When
-    # self._begun is 0, it has not been called.
-
-    # This arrangement allows us to handle the special case of a
-    # transaction with no modified objects.  It is possible for
-    # registration to be occur unintentionally and for a persistent
-    # object to compensate by making itself as unchanged.  When this
-    # happens, it's possible to commit a transaction with no modified
-    # objects.
-
-    # Since tpc_begin() may raise a ReadOnlyError, don't call it if there
-    # are no objects.  This avoids spurious (?) errors when working with
-    # a read-only storage.
-
     def commit(self, object, transaction):
         if object is self:
-            if not self._begun:
-                self._storage.tpc_begin(transaction)
-                self._begun = 1
-
             # We registered ourself.  Execute a commit action, if any.
             if self.__onCommitActions is not None:
                 method_name, args, kw = self.__onCommitActions.pop(0)
@@ -313,10 +303,6 @@
             # Nothing to do
             return
 
-        if not self._begun:
-            self._storage.tpc_begin(transaction)
-            self._begun = 1
-
         stack = [object]
 
         # Create a special persistent_id that passes T and the subobject
@@ -623,7 +609,6 @@
     def tpc_begin(self, transaction, sub=None):
         self._invalidating = []
         self._creating = []
-        self._begun = 0
 
         if sub:
             # Sub-transaction!
@@ -633,10 +618,7 @@
                 self._storage = _tmp
                 _tmp.registerDB(self._db, 0)
 
-            # It's okay to always call tpc_begin() for a sub-transaction
-            # because this isn't the real storage.
-            self._storage.tpc_begin(transaction)
-            self._begun = 1
+        self._storage.tpc_begin(transaction)
 
     def tpc_vote(self, transaction):
         if self.__onCommitActions is not None:


=== ZODB3/ZODB/ConflictResolution.py 1.14.2.1 => 1.14.2.2 ===
--- ZODB3/ZODB/ConflictResolution.py:1.14.2.1	Mon Oct 28 11:04:45 2002
+++ ZODB3/ZODB/ConflictResolution.py	Tue Nov 12 15:18:09 2002
@@ -78,7 +78,7 @@
     except (ImportError, AttributeError):
         zLOG.LOG("Conflict Resolution", zLOG.BLATHER,
                  "Unable to load class", error=sys.exc_info())
-        bad_class[class_tuple] = 1
+        bad_classes[class_tuple] = 1
         return None
     return klass
 


=== ZODB3/ZODB/BaseStorage.py 1.24.2.1 => 1.24.2.2 ===
--- ZODB3/ZODB/BaseStorage.py:1.24.2.1	Mon Oct 21 11:15:55 2002
+++ ZODB3/ZODB/BaseStorage.py	Tue Nov 12 15:18:09 2002
@@ -63,6 +63,15 @@
     def close(self):
         pass
 
+    def sortKey(self):
+        """Return a string that can be used to sort storage instances.
+
+        The key must uniquely identify a storage and must be the same
+        across multiple instantiations of the same storage.
+        """
+        # name may not be sufficient, e.g. ZEO has a user-definable name.
+        return self.__name__
+
     def getName(self):
         return self.__name__