[Zope-Checkins] CVS: Zope3/lib/python/Transaction - _defaultTransaction.py:1.3
Jeremy Hylton
jeremy@zope.com
Fri, 21 Jun 2002 16:35:04 -0400
Update of /cvs-repository/Zope3/lib/python/Transaction
In directory cvs.zope.org:/tmp/cvs-serv32553
Modified Files:
_defaultTransaction.py
Log Message:
Break up commit() into many small pieces.
=== Zope3/lib/python/Transaction/_defaultTransaction.py 1.2 => 1.3 ===
hosed = 0
+already_hosed_text = \
+ """A serious error, which was probably a system error,
+ occurred in a previous database transaction. This
+ application may be in an invalid state and must be
+ restarted before database updates can be allowed.
+
+ Beware though that if the error was due to a serious system
+ problem, such as a disk full condition, then the
+ application may not come up until you deal with the system
+ problem. See your application log for information on the
+ error that lead to this problem.
+ """
+
+hosed_text = \
+ """A storage error occurred in the last phase of a
+ two-phase commit. This shouldn't happen. The application
+ may be in a hosed state, so transactions will not be
+ allowed to commit until the site/storage is reset by a
+ restart. """
+
+
class Transaction:
"""Simple transaction objects for single-threaded applications."""
@@ -141,181 +162,196 @@
def commit(self, subtransaction=None):
"""Finalize the transaction."""
+ # This method does all the setup, then calls _commit().
+
global hosed
-
- objects=self._objects
- jars={}
+
+ # Make local reference to objects, which will be changed to
+ # new object if there is an implicit subtransaction commit
+ objects = self._objects
+ jars = {}
jarsv = None
- subj=self._sub
- subjars=()
+ subjars = ()
if subtransaction:
- if subj is None: self._sub=subj={}
+ if self._sub is None:
+ self._sub = {}
else:
- if subj is not None:
+ if self._sub is not None:
if objects:
# Do an implicit sub-transaction commit:
self.commit(1)
- objects=[]
- subjars=subj.values()
- self._sub=None
+ objects = []
+ subjars = self._sub.values()
+ self._sub = None
# If not a subtransaction, then we need to add any non-
# subtransaction-supporting objects that may have been
# stowed away during subtransaction commits to _objects.
if (subtransaction is None) and (self._non_st_objects is not None):
- append=objects.append
- for object in self._non_st_objects:
- append(object)
+ objects.extend(self._non_st_objects)
self._non_st_objects = None
- t=v=tb=None
+ t = v = tb = None
if (objects or subjars) and hosed:
# Something really bad happened and we don't
# trust the system state.
- raise TransactionError, (
-
- """A serious error, which was probably a system error,
- occurred in a previous database transaction. This
- application may be in an invalid state and must be
- restarted before database updates can be allowed.
-
- Beware though that if the error was due to a serious
- system problem, such as a disk full condition, then
- the application may not come up until you deal with
- the system problem. See your application log for
- information on the error that lead to this problem.
- """)
+ raise TransactionError(already_hosed_text)
try:
+ self._commit(objects, jars, subtransaction, subjars)
+ finally:
+ tb = None
+ del objects[:] # clear registered
+ if not subtransaction and self._id is not None:
+ free_transaction()
- # It's important that:
- #
- # - Every object in self._objects is either committed
- # or aborted.
- #
- # - For each object that is committed
- # we call tpc_begin on it's jar at least once
- #
- # - For every jar for which we've called tpc_begin on,
- # we either call tpc_abort or tpc_finish. It is OK
- # to call these multiple times, as the storage is
- # required to ignore these calls if tpc_begin has not
- # been called.
-
- ncommitted=0
- try:
- for o in objects:
- j=getattr(o, '_p_jar', o)
- if j is not None:
- i=id(j)
- if not (i in jars):
- jars[i]=j
- if subtransaction:
-
- # If a jar does not support subtransactions,
- # we need to save it away to be committed in
- # the outer transaction.
- try:
- j.tpc_begin(self, subtransaction)
- except TypeError:
- j.tpc_begin(self)
-
- if hasattr(j, 'commit_sub'):
- subj[i] = j
- else:
- if self._non_st_objects is None:
- self._non_st_objects = []
- self._non_st_objects.append(o)
- continue
-
- else:
- j.tpc_begin(self)
- j.commit(o, self)
- ncommitted = ncommitted+1
-
- # Commit work done in subtransactions
- while subjars:
- j=subjars.pop()
- i=id(j)
- if not (i in jars):
- jars[i]=j
- j.commit_sub(self)
-
- jarsv = jars.values()
- for jar in jarsv:
- if not subtransaction:
- jar.tpc_vote(self)
+ def _commit(self, objects, jars, subtransaction, subjars):
+ # Do the real work of commit
+
+ # It's important that:
+ #
+ # - Every object in self._objects is either committed
+ # or aborted.
+ #
+ # - For each object that is committed
+ # we call tpc_begin on it's jar at least once
+ #
+ # - For every jar for which we've called tpc_begin on,
+ # we either call tpc_abort or tpc_finish. It is OK
+ # to call these multiple times, as the storage is
+ # required to ignore these calls if tpc_begin has not
+ # been called.
- try:
- # Try to finish one jar, since we may be able to
- # recover if the first one fails.
- if jarsv:
- jarsv[-1].tpc_finish(self) # This should never fail
- jarsv.pop() # It didn't, so it's taken care of.
- except:
- # Bug if it does, we need to keep track of it
- LOG("Transaction", ERROR,
- "A storage error occurred in the last phase of a "
- "two-phase commit. This shouldn\'t happen. ",
- error=sys.exc_info())
- raise
+ ncommitted = 0
+ jarsv = None
+ try:
+ for o in objects:
+ j = getattr(o, '_p_jar', o)
+ if j is not None:
+ i = id(j)
+ if i not in jars:
+ jars[i] = j
+
+ if subtransaction:
+ if not self._subtrans_begin(o, j, subtransaction):
+ # The jar does not support subtransactions
+ continue
+ else:
+ j.tpc_begin(self)
+ j.commit(o, self)
+ ncommitted += 1
+
+ jars.update(self._subtrans_commit(subjars))
+
+ jarsv = jars.values()
+ for jar in jarsv:
+ if not subtransaction:
+ jar.tpc_vote(self)
+
+ # commit one first, because we can still recover if only
+ # the first one fails
+ self._commit_one(jarsv)
+ self._commit_rest(jarsv)
+ except:
+ t, v, tb = sys.exc_info()
- try:
- while jarsv:
- jarsv[-1].tpc_finish(self) # This should never fail
- jarsv.pop() # It didn't, so it's taken care of.
- except:
- # Bug if it does, we need to yell FIRE!
- # Someone finished, so don't allow any more
- # work without at least a restart!
- hosed=1
- LOG("Transaction", PANIC,
- "A storage error occurred in the last phase of a "
- "two-phase commit. This shouldn't happen. "
- "The application may be in a hosed state, so "
- "transactions will not be allowed to commit "
- "until the site/storage is reset by a restart. ",
- error=sys.exc_info())
- raise
-
- except:
- t, v, tb = sys.exc_info()
+ if jarsv is None:
+ cleanup_jars = jars.values()
+ else:
+ cleanup_jars = jarsv
+ self._commit_failed(objects[ncommitted:], cleanup_jars, subjars)
- # Ugh, we got an got an error during commit, so we
- # have to clean up.
+ raise t, v, tb
- # First, we have to abort any uncommitted objects.
- for o in objects[ncommitted:]:
- try:
- j=getattr(o, '_p_jar', o)
- if j is not None: j.abort(o, self)
- except: pass
-
- # Then, we unwind TPC for the jars that began it.
- if jarsv is None:
- jarsv = jars.values()
- for j in jarsv:
- try: j.tpc_abort(self) # This should never fail
- except:
- LOG("Transaction", ERROR,
- "A storage error occured during object abort "
- "This shouldn't happen. ",
- error=sys.exc_info())
-
- # Ugh, we need to abort work done in sub-transactions.
- while subjars:
- j=subjars.pop()
- j.abort_sub(self) # This should never fail
+ def _commit_one(self, jarsv):
+ try:
+ # Try to finish one jar, since we may be able to recover
+ # if the first one fails.
+ if jarsv:
+ jarsv[-1].tpc_finish(self) # This should never fail
+ jarsv.pop() # It didn't, so it's taken care of.
+ except:
+ # Bug if it does, we need to keep track of it
+ LOG("Transaction", ERROR,
+ "A storage error occurred in the last phase of a "
+ "two-phase commit. This shouldn\'t happen. ",
+ error=sys.exc_info())
+ raise
- raise t,v,tb
+ def _commit_rest(self, jarsv):
+ global hosed
+ try:
+ while jarsv:
+ jarsv[-1].tpc_finish(self) # This should never fail
+ jarsv.pop() # It didn't, so it's taken care of.
+ except:
+ # But if it does, we need to yell FIRE! Someone finished,
+ # so don't allow any more work without at least a restart!
+ hosed = 1
+ LOG("Transaction", PANIC, hosed_text, error=sys.exc_info())
+ raise
+
+ def _commit_failed(self, uncommitted, jarsv, subjars):
+ # Ugh, we got an got an error during commit, so we have to
+ # clean up.
- finally:
- tb = None
- del objects[:] # clear registered
- if not subtransaction and self._id is not None:
- free_transaction()
+ # First, we have to abort any uncommitted objects.
+ for o in uncommitted:
+ try:
+ j = getattr(o, '_p_jar', o)
+ if j is not None:
+ j.abort(o, self)
+ except:
+ LOG("Transaction", ERROR,
+ "An error occured while cleaning up a failed commit.",
+ error=sys.exc_info())
+
+ # Then, we unwind TPC for the jars that began it.
+ for j in jarsv:
+ try:
+ j.tpc_abort(self) # This should never fail
+ except:
+ LOG("Transaction", ERROR,
+ "A storage error occured during object abort "
+ "This shouldn't happen. ",
+ error=sys.exc_info())
+
+ # Ugh, we need to abort work done in sub-transactions.
+ for j in subjars:
+ j.abort_sub(self) # This should never fail
+ # XXX Shouldn't we have a try-except anyway?
+
+ def _subtrans_begin(self, obj, jar, subtransaction):
+ # If a jar does not support subtransactions, we need to save
+ # it away to be committed in the outer transaction.
+
+ # XXX I think subtransaction is always 1, and, thus, doesn't
+ # need to be passed
+ assert subtransaction == 1
+ try:
+ jar.tpc_begin(self, subtransaction)
+ except TypeError:
+ jar.tpc_begin(self)
+
+ if hasattr(jar, 'commit_sub'):
+ self._sub[id(jar)] = jar
+ return 1
+ else:
+ if self._non_st_objects is None:
+ self._non_st_objects = []
+ self._non_st_objects.append(obj)
+ return 0
+
+ def _subtrans_commit(self, subjars):
+ jars = {}
+ # Commit work done in subtransactions
+ while subjars:
+ j = subjars.pop()
+ jars[id(j)] = j
+ j.commit_sub(self)
+ return jars
def register(self, object):
"""Register the given object for transaction control."""