[Zodb-checkins] CVS: ZODB3/ZODB - Transaction.py:1.39.16.2
Jeremy Hylton
jeremy@zope.com
Fri, 1 Nov 2002 14:34:38 -0500
Update of /cvs-repository/ZODB3/ZODB
In directory cvs.zope.org:/tmp/cvs-serv22082/ZODB
Modified Files:
Tag: ZODB3-deadlock-debug-branch
Transaction.py
Log Message:
Major rewrite to guarantee jars are locked in a globally consistent order.
Too many changes to do justice to in a checkin message. Well write a
better comment when this gets committed to the trunk.
=== ZODB3/ZODB/Transaction.py 1.39.16.1 => 1.39.16.2 ===
--- ZODB3/ZODB/Transaction.py:1.39.16.1 Thu Oct 31 16:33:09 2002
+++ ZODB3/ZODB/Transaction.py Fri Nov 1 14:34:37 2002
@@ -19,23 +19,17 @@
import time, sys, struct, POSException
from struct import pack
from string import split, strip, join
-from zLOG import LOG, ERROR, PANIC
+from zLOG import LOG, ERROR, PANIC, INFO, BLATHER
from POSException import ConflictError
+from ZODB import utils
# Flag indicating whether certain errors have occurred.
hosed=0
-def get_jars_in_order(jars):
- # make sure jarsv contains the jars in the order they
- # were added
- L = []
- for jar, order in jars.values():
- L.append((order, jar))
- L.sort()
- jarsv = []
- for order, jar in L:
- jarsv.append(jar)
- return jarsv
+def jar_cmp(j1, j2):
+ # Call sortKey() every time, because a ZEO client could reconnect
+ # to a different server at any time.
+ return cmp(j1.sortKey(), j2.sortKey())
class Transaction:
'Simple transaction objects for single-threaded applications.'
@@ -65,6 +59,9 @@
for c in self._connections.values(): c.close()
del self._connections
+ def log(self, msg, level=INFO, error=None):
+ LOG("TM:%s" % self._id, level, msg, error=error)
+
def sub(self):
# Create a manually managed subtransaction for internal use
r=self.__class__()
@@ -89,6 +86,8 @@
entered two-phase commit yet, so no tpc_ messages are sent.
'''
+ self.log("abort")
+
if subtransaction and (self._non_st_objects is not None):
raise POSException.TransactionError, (
"""Attempted to abort a sub-transaction, but a participating
@@ -96,11 +95,8 @@
""")
t = None
- subj = self._sub
- subjars = ()
if not subtransaction:
-
# Must add in any non-subtransaction supporting objects that
# may have been stowed away from previous subtransaction
# commits.
@@ -108,11 +104,14 @@
self._objects.extend(self._non_st_objects)
self._non_st_objects = None
- if subj is not None:
+ if self._sub is not None:
# Abort of top-level transaction after commiting
# subtransactions.
- subjars = subj.values()
+ subjars = self._sub.values()
+ subjars.sort(jar_cmp)
self._sub = None
+ else:
+ subjars = []
try:
# Abort the objects
@@ -122,13 +121,20 @@
if j is not None:
j.abort(o, self)
except:
+ # Record the first exception that occurred
if t is None:
t, v, tb = sys.exc_info()
+ else:
+ self.log("Failed to abort object %016x" %
+ utils.U64(o._p_oid), error=sys.exc_info())
- # Ugh, we need to abort work done in sub-transactions.
- while subjars:
- j = subjars.pop()
- j.abort_sub(self) # This should never fail
+ # tpc_begin() was never called, so tpc_abort() should not be
+ # called.
+
+ if not subtransaction:
+ # abort_sub() must be called to clear subtransaction state
+ for jar in subjars:
+ jar.abort_sub(self) # This should never fail
if t is not None:
raise t, v, tb
@@ -148,7 +154,9 @@
This aborts any transaction in progres.
'''
- if self._objects: self.abort(subtransaction, 0)
+ if self._objects:
+ self.abort(subtransaction, 0)
+ self.log("begin")
if info:
info=split(info,'\t')
self.user=strip(info[0])
@@ -157,31 +165,36 @@
def commit(self, subtransaction=None):
'Finalize the transaction'
+ self.log("commit")
+
objects = self._objects
- jars = {}
- jarsv = None
- subj = self._sub
- subjars = ()
if subtransaction:
- if subj is None:
- self._sub = subj = {}
+ if self._sub is None:
+ # Must store state across multiple subtransactions
+ # so that the final commit can commit all subjars.
+ self._sub = {}
else:
- if subj is not None:
+ if self._sub is not None:
+ # This commit is for a top-level transaction that
+ # has previously committed subtransactions. Do
+ # one last subtransaction commit to clear out the
+ # current objects, then commit all the subjars.
if objects:
- # Do an implicit sub-transaction commit:
self.commit(1)
- # XXX What does this do?
objects = []
- subjars = subj.values()
+ subjars = self._sub.values()
+ subjars.sort(jar_cmp)
self._sub = None
- # If not a subtransaction, then we need to add any non-
- # subtransaction-supporting objects that may have been
- # stowed away during subtransaction commits to _objects.
- if (subtransaction is None) and (self._non_st_objects is not None):
- objects.extend(self._non_st_objects)
- self._non_st_objects = None
+ # If there were any non-subtransaction-aware jars
+ # involved in earlier subtransaction commits, we need
+ # to add them to the list of jars to commit.
+ if self._non_st_objects is not None:
+ objects.extend(self._non_st_objects)
+ self._non_st_objects = None
+ else:
+ subjars = []
if (objects or subjars) and hosed:
# Something really bad happened and we don't
@@ -200,90 +213,118 @@
# either call tpc_abort or tpc_finish. It is OK to call
# these multiple times, as the storage is required to ignore
# these calls if tpc_begin has not been called.
+ #
+ # - That we call tpc_begin() in a globally consistent order,
+ # so that concurrent transactions involving multiple storages
+ # do not deadlock.
try:
ncommitted = 0
+ jars = self._get_jars(objects, subtransaction)
try:
- # the values in jars will be 2-tuples containing a jar
- # and an int, where the int indicates the order in
- # which the jars were added.
- ncommitted += self._commit_objects(objects, jars,
- subtransaction, subj)
-
- self._commit_subtrans(jars, subjars)
- jarsv = get_jars_in_order(jars)
- for jar in jarsv:
- if not subtransaction:
+ self._commit_begin(jars, subtransaction)
+ ncommitted += self._commit_objects(objects)
+ if not subtransaction and subjars:
+ self._commit_subtrans(subjars)
+ if not subtransaction:
+ # Unless this is a really old jar that doesn't
+ # implement tpc_vote(), it must raise an exception
+ # if it can't commit the transaction.
+ for jar in jars:
try:
vote = jar.tpc_vote
- except:
+ except AttributeError:
pass
else:
- vote(self) # last chance to bail
+ vote(self)
# Handle multiple jars separately. If there are
# multiple jars and one fails during the finish, we
# mark this transaction manager as hosed.
- if len(jarsv) == 1:
- self._finish_one(jarsv[0])
+ if len(jars) == 1:
+ self._finish_one(jars[0])
else:
- self._finish_many(jarsv)
+ self._finish_many(jars)
except:
# Ugh, we got an got an error during commit, so we
- # have to clean up.
- exc_info = sys.exc_info()
- if jarsv is None:
- jarsv = get_jars_in_order(jars)
- self._commit_error(exc_info, objects, ncommitted,
- jarsv, subjars)
+ # have to clean up. First save the original exception
+ # in case the cleanup process causes another
+ # exception.
+ t, v, tb = sys.exc_info()
+ self._commit_error(objects, ncommitted, jars, subjars)
+ raise t, v, tb
finally:
del objects[:] # clear registered
if not subtransaction and self._id is not None:
free_transaction()
- def _commit_objects(self, objects, jars, subtransaction, subj):
- # commit objects and return number of commits
- ncommitted = 0
+ def _get_jars(self, objects, subtransaction):
+ # Returns a list of jars for this transaction.
+
+ # Find all the jars and sort them in a globally consistent order.
+ # objects is a list of persistent objects and jars.
+ # If this is a subtransaction and a jar is not subtransaction aware,
+ # it's object gets delayed until the parent transaction commits.
+
+ d = {}
for o in objects:
- j = getattr(o, '_p_jar', o)
- if j is not None:
- i = id(j)
- if not jars.has_key(i):
- jars[i] = j, len(jars)
-
- if subtransaction:
- # If a jar does not support subtransactions,
- # we need to save it away to be committed in
- # the outer transaction.
- try:
- j.tpc_begin(self, subtransaction)
- except TypeError:
- j.tpc_begin(self)
+ jar = getattr(o, '_p_jar', o)
+ if jar is None:
+ # I don't think this should ever happen, but can't
+ # prove that it won't. If there is no jar, there
+ # is nothing to be done.
+ self.log("Object with no jar registered for transaction: "
+ "%s" % repr(o), level=BLATHER)
+ continue
+ # jar may not be safe as a dictionary key
+ key = id(jar)
+ d[key] = jar
+
+ if subtransaction:
+ if hasattr(jar, "commit_sub"):
+ self._sub[key] = jar
+ else:
+ if self._non_st_objects is None:
+ self._non_st_objects = []
+ self._non_st_objects.append(o)
+
+ jars = d.values()
+ jars.sort(jar_cmp)
+
+ return jars
+
+ def _commit_begin(self, jars, subtransaction):
+ for jar in jars:
+ if subtransaction:
+ try:
+ jar.tpc_begin(self, subtransaction)
+ except TypeError:
+ # Assume that TypeError means that tpc_begin() only
+ # takes one argument, and that the jar doesn't
+ # support subtransactions.
+ jar.tpc_begin(self)
+ else:
+ self.log("tpc_begin %s" % jar)
+ jar.tpc_begin(self)
- if hasattr(j, 'commit_sub'):
- subj[i] = j
- else:
- if self._non_st_objects is None:
- self._non_st_objects = []
- self._non_st_objects.append(o)
- continue
- else:
- j.tpc_begin(self)
- j.commit(o, self)
+ def _commit_objects(self, objects):
+ ncommitted = 0
+ for o in objects:
+ jar = getattr(o, "_p_jar", o)
+ if jar is None:
+ continue
+ jar.commit(o, self)
ncommitted += 1
return ncommitted
- def _commit_subtrans(self, jars, subjars):
+ def _commit_subtrans(self, subjars):
# Commit work done in subtransactions
- while subjars:
- j = subjars.pop()
- i = id(j)
- if not jars.has_key(i):
- jars[i] = j, len(jars)
- j.commit_sub(self)
+ for jar in subjars:
+ jar.commit_sub(self)
def _finish_one(self, jar):
try:
- jar.tpc_finish(self) # This should never fail
+ # The database can't guarantee consistency if call fails.
+ jar.tpc_finish(self)
except:
# Bug if it does, we need to keep track of it
LOG('ZODB', ERROR,
@@ -292,42 +333,40 @@
error=sys.exc_info())
raise
- def _finish_many(self, jarsv):
+ def _finish_many(self, jars):
global hosed
try:
- while jarsv:
- jarsv[-1].tpc_finish(self) # This should never fail
- jarsv.pop() # It didn't, so it's taken care of.
+ for jar in jars:
+ # The database can't guarantee consistency if call fails.
+ jar.tpc_finish(self)
except:
- # Bug if it does, we need to yell FIRE!
- # Someone finished, so don't allow any more
- # work without at least a restart!
hosed = 1
LOG('ZODB', PANIC,
"A storage error occurred in the last phase of a "
"two-phase commit. This shouldn\'t happen. "
- "The application may be in a hosed state, so "
- "transactions will not be allowed to commit "
+ "The application will not be allowed to commit "
"until the site/storage is reset by a restart. ",
error=sys.exc_info())
raise
- def _commit_error(self, (t, v, tb),
- objects, ncommitted, jarsv, subjars):
- # handle an exception raised during commit
- # takes sys.exc_info() as argument
-
- # First, we have to abort any uncommitted objects.
+ def _commit_error(self, objects, ncommitted, jars, subjars):
+ # First, we have to abort any uncommitted objects. The abort
+ # will mark the object for invalidation, so that it's last
+ # committed state will be restored.
for o in objects[ncommitted:]:
try:
j = getattr(o, '_p_jar', o)
if j is not None:
j.abort(o, self)
except:
- pass
-
- # Then, we unwind TPC for the jars that began it.
- for j in jarsv:
+ # nothing to do but log the error
+ self.log("Failed to abort object %016x" % utils.U64(o._p_oid),
+ error=sys.exc_info())
+
+ # Abort the two-phase commit. It's only necessary to abort the
+ # commit for jars that began it, but it is harmless to abort it
+ # for all.
+ for j in jars:
try:
j.tpc_abort(self) # This should never fail
except:
@@ -335,9 +374,14 @@
"A storage error occured during object abort. This "
"shouldn't happen. ", error=sys.exc_info())
- # Ugh, we need to abort work done in sub-transactions.
- while subjars:
- j = subjars.pop()
+ # After the tpc_abort(), call abort_sub() on all the
+ # subtrans-aware jars to *really* abort the subtransaction.
+
+ # Example: For Connection(), the tpc_abort() will abort the
+ # subtransaction TmpStore() and abort_sub() will remove the
+ # TmpStore.
+
+ for j in subjars:
try:
j.abort_sub(self) # This should never fail
except:
@@ -346,8 +390,6 @@
"object abort. This shouldn't happen.",
error=sys.exc_info())
- raise t, v, tb
-
def register(self,object):
'Register the given object for transaction control.'
self._append(object)
@@ -379,8 +421,6 @@
the system problem. See your application log for
information on the error that lead to this problem.
"""
-
-
############################################################################
# install get_transaction: