[Zodb-checkins] CVS: ZODB3/ZODB - Transaction.py:1.39.2.1 DB.py:1.43.8.1 Connection.py:1.76.4.2 ConflictResolution.py:1.14.2.2 BaseStorage.py:1.24.2.2
Jeremy Hylton
jeremy@zope.com
Tue, 12 Nov 2002 15:18:43 -0500
Update of /cvs-repository/ZODB3/ZODB
In directory cvs.zope.org:/tmp/cvs-serv20159/ZODB
Modified Files:
Tag: ZODB3-3_1-branch
Transaction.py DB.py Connection.py ConflictResolution.py
BaseStorage.py
Log Message:
Merge the deadlock debug code to the release branch.
=== ZODB3/ZODB/Transaction.py 1.39 => 1.39.2.1 ===
--- ZODB3/ZODB/Transaction.py:1.39 Fri Sep 27 14:37:24 2002
+++ ZODB3/ZODB/Transaction.py Tue Nov 12 15:18:09 2002
@@ -19,12 +19,34 @@
import time, sys, struct, POSException
from struct import pack
from string import split, strip, join
-from zLOG import LOG, ERROR, PANIC
+from zLOG import LOG, ERROR, PANIC, INFO, BLATHER, WARNING
from POSException import ConflictError
+from ZODB import utils
# Flag indicating whether certain errors have occurred.
hosed=0
+# There is an order imposed on all jars, based on the storages they
+# serve, that must be consistent across all applications using the
+# storages. The order is defined by the sortKey() method of the jar.
+
+def jar_cmp(j1, j2):
+ # Call sortKey() every time, because a ZEO client could reconnect
+ # to a different server at any time.
+ try:
+ k1 = j1.sortKey()
+ except:
+ LOG("TM", WARNING, "jar missing sortKey() method: %s" % j1)
+ k1 = id(j1)
+
+ try:
+ k2 = j2.sortKey()
+ except:
+ LOG("TM", WARNING, "jar missing sortKey() method: %s" % j2)
+ k2 = id(j2)
+
+ return cmp(k1, k2)
+
class Transaction:
'Simple transaction objects for single-threaded applications.'
user=''
@@ -53,6 +75,9 @@
for c in self._connections.values(): c.close()
del self._connections
+ def log(self, msg, level=INFO, error=None):
+ LOG("TM:%s" % self._id, level, msg, error=error)
+
def sub(self):
# Create a manually managed subtransaction for internal use
r=self.__class__()
@@ -84,11 +109,8 @@
""")
t = None
- subj = self._sub
- subjars = ()
if not subtransaction:
-
# Must add in any non-subtransaction supporting objects that
# may have been stowed away from previous subtransaction
# commits.
@@ -96,11 +118,14 @@
self._objects.extend(self._non_st_objects)
self._non_st_objects = None
- if subj is not None:
+ if self._sub is not None:
# Abort of top-level transaction after commiting
# subtransactions.
- subjars = subj.values()
+ subjars = self._sub.values()
+ subjars.sort(jar_cmp)
self._sub = None
+ else:
+ subjars = []
try:
# Abort the objects
@@ -110,13 +135,20 @@
if j is not None:
j.abort(o, self)
except:
+ # Record the first exception that occurred
if t is None:
t, v, tb = sys.exc_info()
+ else:
+ self.log("Failed to abort object %016x" %
+ utils.U64(o._p_oid), error=sys.exc_info())
- # Ugh, we need to abort work done in sub-transactions.
- while subjars:
- j = subjars.pop()
- j.abort_sub(self) # This should never fail
+ # tpc_begin() was never called, so tpc_abort() should not be
+ # called.
+
+ if not subtransaction:
+ # abort_sub() must be called to clear subtransaction state
+ for jar in subjars:
+ jar.abort_sub(self) # This should never fail
if t is not None:
raise t, v, tb
@@ -136,7 +168,8 @@
This aborts any transaction in progres.
'''
- if self._objects: self.abort(subtransaction, 0)
+ if self._objects:
+ self.abort(subtransaction, 0)
if info:
info=split(info,'\t')
self.user=strip(info[0])
@@ -146,30 +179,32 @@
'Finalize the transaction'
objects = self._objects
- jars = {}
- jarsv = None
- subj = self._sub
- subjars = ()
+ subjars = []
if subtransaction:
- if subj is None:
- self._sub = subj = {}
+ if self._sub is None:
+ # Must store state across multiple subtransactions
+ # so that the final commit can commit all subjars.
+ self._sub = {}
else:
- if subj is not None:
+ if self._sub is not None:
+ # This commit is for a top-level transaction that
+ # has previously committed subtransactions. Do
+ # one last subtransaction commit to clear out the
+ # current objects, then commit all the subjars.
if objects:
- # Do an implicit sub-transaction commit:
self.commit(1)
- # XXX What does this do?
objects = []
- subjars = subj.values()
+ subjars = self._sub.values()
+ subjars.sort(jar_cmp)
self._sub = None
- # If not a subtransaction, then we need to add any non-
- # subtransaction-supporting objects that may have been
- # stowed away during subtransaction commits to _objects.
- if (subtransaction is None) and (self._non_st_objects is not None):
- objects.extend(self._non_st_objects)
- self._non_st_objects = None
+ # If there were any non-subtransaction-aware jars
+ # involved in earlier subtransaction commits, we need
+ # to add them to the list of jars to commit.
+ if self._non_st_objects is not None:
+ objects.extend(self._non_st_objects)
+ self._non_st_objects = None
if (objects or subjars) and hosed:
# Something really bad happened and we don't
@@ -188,88 +223,140 @@
# either call tpc_abort or tpc_finish. It is OK to call
# these multiple times, as the storage is required to ignore
# these calls if tpc_begin has not been called.
+ #
+ # - That we call tpc_begin() in a globally consistent order,
+ # so that concurrent transactions involving multiple storages
+ # do not deadlock.
try:
ncommitted = 0
+ jars = self._get_jars(objects, subtransaction)
try:
- ncommitted += self._commit_objects(objects, jars,
- subtransaction, subj)
-
- self._commit_subtrans(jars, subjars)
-
- jarsv = jars.values()
- for jar in jarsv:
- if not subtransaction:
+ # If not subtransaction, then jars will be modified.
+ self._commit_begin(jars, subjars, subtransaction)
+ ncommitted += self._commit_objects(objects)
+ if not subtransaction:
+ # Unless this is a really old jar that doesn't
+ # implement tpc_vote(), it must raise an exception
+ # if it can't commit the transaction.
+ for jar in jars:
try:
vote = jar.tpc_vote
- except:
+ except AttributeError:
pass
else:
- vote(self) # last chance to bail
+ vote(self)
# Handle multiple jars separately. If there are
# multiple jars and one fails during the finish, we
# mark this transaction manager as hosed.
- if len(jarsv) == 1:
- self._finish_one(jarsv[0])
+ if len(jars) == 1:
+ self._finish_one(jars[0])
else:
- self._finish_many(jarsv)
+ self._finish_many(jars)
except:
# Ugh, we got an got an error during commit, so we
- # have to clean up.
- exc_info = sys.exc_info()
- if jarsv is None:
- jarsv = jars.values()
- self._commit_error(exc_info, objects, ncommitted,
- jarsv, subjars)
+ # have to clean up. First save the original exception
+ # in case the cleanup process causes another
+ # exception.
+ t, v, tb = sys.exc_info()
+ try:
+ self._commit_error(objects, ncommitted, jars, subjars)
+ except:
+ LOG('ZODB', ERROR,
+ "A storage error occured during transaction "
+ "abort. This shouldn't happen.",
+ error=sys.exc_info())
+
+ raise t, v, tb
finally:
del objects[:] # clear registered
if not subtransaction and self._id is not None:
free_transaction()
- def _commit_objects(self, objects, jars, subtransaction, subj):
- # commit objects and return number of commits
- ncommitted = 0
+ def _get_jars(self, objects, subtransaction):
+ # Returns a list of jars for this transaction.
+
+ # Find all the jars and sort them in a globally consistent order.
+ # objects is a list of persistent objects and jars.
+ # If this is a subtransaction and a jar is not subtransaction aware,
+ # it's object gets delayed until the parent transaction commits.
+
+ d = {}
for o in objects:
- j = getattr(o, '_p_jar', o)
- if j is not None:
- i = id(j)
- if not jars.has_key(i):
- jars[i] = j
-
- if subtransaction:
- # If a jar does not support subtransactions,
- # we need to save it away to be committed in
- # the outer transaction.
- try:
- j.tpc_begin(self, subtransaction)
- except TypeError:
- j.tpc_begin(self)
+ jar = getattr(o, '_p_jar', o)
+ if jar is None:
+ # I don't think this should ever happen, but can't
+ # prove that it won't. If there is no jar, there
+ # is nothing to be done.
+ self.log("Object with no jar registered for transaction: "
+ "%s" % repr(o), level=BLATHER)
+ continue
+ # jar may not be safe as a dictionary key
+ key = id(jar)
+ d[key] = jar
+
+ if subtransaction:
+ if hasattr(jar, "commit_sub"):
+ self._sub[key] = jar
+ else:
+ if self._non_st_objects is None:
+ self._non_st_objects = []
+ self._non_st_objects.append(o)
+
+ jars = d.values()
+ jars.sort(jar_cmp)
- if hasattr(j, 'commit_sub'):
- subj[i] = j
- else:
- if self._non_st_objects is None:
- self._non_st_objects = []
- self._non_st_objects.append(o)
- continue
- else:
- j.tpc_begin(self)
- j.commit(o, self)
+ return jars
+
+ def _commit_begin(self, jars, subjars, subtransaction):
+ if subtransaction:
+ assert not subjars
+ for jar in jars:
+ try:
+ jar.tpc_begin(self, subtransaction)
+ except TypeError:
+ # Assume that TypeError means that tpc_begin() only
+ # takes one argument, and that the jar doesn't
+ # support subtransactions.
+ jar.tpc_begin(self)
+ else:
+ # Merge in all the jars used by one of the subtransactions.
+
+ # When the top-level subtransaction commits, the tm must
+ # call commit_sub() for each jar involved in one of the
+ # subtransactions. The commit_sub() method should call
+ # tpc_begin() on the storage object.
+
+ # It must also call tpc_begin() on jars that were used in
+ # a subtransaction but don't support subtransactions.
+
+ # These operations must be performed on the jars in order.
+
+ # Modify jars inplace to include the subjars, too.
+ jars += subjars
+ jars.sort(jar_cmp)
+ # assume that subjars is small, so that it's cheaper to test
+ # whether jar in subjars than to make a dict and do has_key.
+ for jar in jars:
+ if jar in subjars:
+ jar.commit_sub(self)
+ else:
+ jar.tpc_begin(self)
+
+ def _commit_objects(self, objects):
+ ncommitted = 0
+ for o in objects:
+ jar = getattr(o, "_p_jar", o)
+ if jar is None:
+ continue
+ jar.commit(o, self)
ncommitted += 1
return ncommitted
- def _commit_subtrans(self, jars, subjars):
- # Commit work done in subtransactions
- while subjars:
- j = subjars.pop()
- i = id(j)
- if not jars.has_key(i):
- jars[i] = j
- j.commit_sub(self)
-
def _finish_one(self, jar):
try:
- jar.tpc_finish(self) # This should never fail
+ # The database can't guarantee consistency if call fails.
+ jar.tpc_finish(self)
except:
# Bug if it does, we need to keep track of it
LOG('ZODB', ERROR,
@@ -278,42 +365,40 @@
error=sys.exc_info())
raise
- def _finish_many(self, jarsv):
+ def _finish_many(self, jars):
global hosed
try:
- while jarsv:
- jarsv[-1].tpc_finish(self) # This should never fail
- jarsv.pop() # It didn't, so it's taken care of.
+ for jar in jars:
+ # The database can't guarantee consistency if call fails.
+ jar.tpc_finish(self)
except:
- # Bug if it does, we need to yell FIRE!
- # Someone finished, so don't allow any more
- # work without at least a restart!
hosed = 1
LOG('ZODB', PANIC,
"A storage error occurred in the last phase of a "
"two-phase commit. This shouldn\'t happen. "
- "The application may be in a hosed state, so "
- "transactions will not be allowed to commit "
+ "The application will not be allowed to commit "
"until the site/storage is reset by a restart. ",
error=sys.exc_info())
raise
- def _commit_error(self, (t, v, tb),
- objects, ncommitted, jarsv, subjars):
- # handle an exception raised during commit
- # takes sys.exc_info() as argument
-
- # First, we have to abort any uncommitted objects.
+ def _commit_error(self, objects, ncommitted, jars, subjars):
+ # First, we have to abort any uncommitted objects. The abort
+ # will mark the object for invalidation, so that it's last
+ # committed state will be restored.
for o in objects[ncommitted:]:
try:
j = getattr(o, '_p_jar', o)
if j is not None:
j.abort(o, self)
except:
- pass
-
- # Then, we unwind TPC for the jars that began it.
- for j in jarsv:
+ # nothing to do but log the error
+ self.log("Failed to abort object %016x" % utils.U64(o._p_oid),
+ error=sys.exc_info())
+
+ # Abort the two-phase commit. It's only necessary to abort the
+ # commit for jars that began it, but it is harmless to abort it
+ # for all.
+ for j in jars:
try:
j.tpc_abort(self) # This should never fail
except:
@@ -321,9 +406,14 @@
"A storage error occured during object abort. This "
"shouldn't happen. ", error=sys.exc_info())
- # Ugh, we need to abort work done in sub-transactions.
- while subjars:
- j = subjars.pop()
+ # After the tpc_abort(), call abort_sub() on all the
+ # subtrans-aware jars to *really* abort the subtransaction.
+
+ # Example: For Connection(), the tpc_abort() will abort the
+ # subtransaction TmpStore() and abort_sub() will remove the
+ # TmpStore.
+
+ for j in subjars:
try:
j.abort_sub(self) # This should never fail
except:
@@ -332,8 +422,6 @@
"object abort. This shouldn't happen.",
error=sys.exc_info())
- raise t, v, tb
-
def register(self,object):
'Register the given object for transaction control.'
self._append(object)
@@ -365,8 +453,6 @@
the system problem. See your application log for
information on the error that lead to this problem.
"""
-
-
############################################################################
# install get_transaction:
=== ZODB3/ZODB/DB.py 1.43 => 1.43.8.1 ===
--- ZODB3/ZODB/DB.py:1.43 Wed Aug 14 18:07:09 2002
+++ ZODB3/ZODB/DB.py Tue Nov 12 15:18:09 2002
@@ -577,7 +577,11 @@
self.tpc_begin=s.tpc_begin
self.tpc_vote=s.tpc_vote
self.tpc_finish=s.tpc_finish
+ self._sortKey=s.sortKey
get_transaction().register(self)
+
+ def sortKey(self):
+ return "%s:%s" % (self._sortKey(), id(self))
def abort(self, reallyme, t): pass
=== ZODB3/ZODB/Connection.py 1.76.4.1 => 1.76.4.2 ===
--- ZODB3/ZODB/Connection.py:1.76.4.1 Fri Oct 4 20:28:13 2002
+++ ZODB3/ZODB/Connection.py Tue Nov 12 15:18:09 2002
@@ -191,6 +191,14 @@
return obj
return self[oid]
+ def sortKey(self):
+ # XXX will raise an exception if the DB hasn't been set
+ storage_key = self._sortKey()
+ # If two connections use the same storage, give them a
+ # consistent order using id(). This is unique for the
+ # lifetime of a connection, which is good enough.
+ return "%s:%s" % (storage_key, id(self))
+
def _setDB(self, odb):
"""Begin a new transaction.
@@ -198,6 +206,7 @@
"""
self._db=odb
self._storage=s=odb._storage
+ self._sortKey = odb._storage.sortKey
self.new_oid=s.new_oid
if self._code_timestamp != global_code_timestamp:
# New code is in place. Start a new cache.
@@ -268,27 +277,8 @@
self.__onCommitActions.append((method_name, args, kw))
get_transaction().register(self)
- # NB: commit() is responsible for calling tpc_begin() on the storage.
- # It uses self._begun to track whether it has been called. When
- # self._begun is 0, it has not been called.
-
- # This arrangement allows us to handle the special case of a
- # transaction with no modified objects. It is possible for
- # registration to be occur unintentionally and for a persistent
- # object to compensate by making itself as unchanged. When this
- # happens, it's possible to commit a transaction with no modified
- # objects.
-
- # Since tpc_begin() may raise a ReadOnlyError, don't call it if there
- # are no objects. This avoids spurious (?) errors when working with
- # a read-only storage.
-
def commit(self, object, transaction):
if object is self:
- if not self._begun:
- self._storage.tpc_begin(transaction)
- self._begun = 1
-
# We registered ourself. Execute a commit action, if any.
if self.__onCommitActions is not None:
method_name, args, kw = self.__onCommitActions.pop(0)
@@ -313,10 +303,6 @@
# Nothing to do
return
- if not self._begun:
- self._storage.tpc_begin(transaction)
- self._begun = 1
-
stack = [object]
# Create a special persistent_id that passes T and the subobject
@@ -623,7 +609,6 @@
def tpc_begin(self, transaction, sub=None):
self._invalidating = []
self._creating = []
- self._begun = 0
if sub:
# Sub-transaction!
@@ -633,10 +618,7 @@
self._storage = _tmp
_tmp.registerDB(self._db, 0)
- # It's okay to always call tpc_begin() for a sub-transaction
- # because this isn't the real storage.
- self._storage.tpc_begin(transaction)
- self._begun = 1
+ self._storage.tpc_begin(transaction)
def tpc_vote(self, transaction):
if self.__onCommitActions is not None:
=== ZODB3/ZODB/ConflictResolution.py 1.14.2.1 => 1.14.2.2 ===
--- ZODB3/ZODB/ConflictResolution.py:1.14.2.1 Mon Oct 28 11:04:45 2002
+++ ZODB3/ZODB/ConflictResolution.py Tue Nov 12 15:18:09 2002
@@ -78,7 +78,7 @@
except (ImportError, AttributeError):
zLOG.LOG("Conflict Resolution", zLOG.BLATHER,
"Unable to load class", error=sys.exc_info())
- bad_class[class_tuple] = 1
+ bad_classes[class_tuple] = 1
return None
return klass
=== ZODB3/ZODB/BaseStorage.py 1.24.2.1 => 1.24.2.2 ===
--- ZODB3/ZODB/BaseStorage.py:1.24.2.1 Mon Oct 21 11:15:55 2002
+++ ZODB3/ZODB/BaseStorage.py Tue Nov 12 15:18:09 2002
@@ -63,6 +63,15 @@
def close(self):
pass
+ def sortKey(self):
+ """Return a string that can be used to sort storage instances.
+
+ The key must uniquely identify a storage and must be the same
+ across multiple instantiations of the same storage.
+ """
+ # name may not be sufficient, e.g. ZEO has a user-definable name.
+ return self.__name__
+
def getName(self):
return self.__name__