[Zodb-checkins] SVN: ZODB/trunk/src/ Merge Jim's savepoint/rollback
work from the 3.4 branch. Yay!
Tim Peters
tim.one at comcast.net
Sun Apr 24 01:13:16 EDT 2005
Log message for revision 30136:
Merge Jim's savepoint/rollback work from the 3.4 branch. Yay!
Original checkin msgs follow:
r30131 | jim | 2005-04-23 23:33:29 -0400 (Sat, 23 Apr 2005) | 5 lines
M /ZODB/branches/3.4/src/ZODB/Connection.py
M /ZODB/branches/3.4/src/ZODB/tests/testConnectionSavepoint.py
Fixed a bug in commits with savepoints and changes since savepoints.
Once we start using savepoints, we need to make sure that all data are
committed through the savepoints. Otherwise, things can get committed
in the wrong order, leading to conflicts.
r30130 | jim | 2005-04-23 23:02:00 -0400 (Sat, 23 Apr 2005) | 6 lines
M /ZODB/branches/3.4/src/ZODB/Connection.py
M /ZODB/branches/3.4/src/ZODB/tests/testConnectionSavepoint.py
Fixed a bug that caused assertion errors if an object was added in a
savepoint, then modified and then aborted.
Also added missing code to clear registered objects when a savepoint
was rolled back.
r30129 | jim | 2005-04-23 21:29:02 -0400 (Sat, 23 Apr 2005) | 5 lines
M /ZODB/branches/3.4/src/ZODB/Connection.py
D /ZODB/branches/3.4/src/ZODB/TmpStore.py
M /ZODB/branches/3.4/src/ZODB/tests/testConnection.py
A /ZODB/branches/3.4/src/ZODB/tests/testConnectionSavepoint.py
A /ZODB/branches/3.4/src/ZODB/tests/testConnectionSavepoint.txt
M /ZODB/branches/3.4/src/ZODB/tests/testZODB.py
M /ZODB/branches/3.4/src/ZODB/tests/test_datamanageradapter.py
M /ZODB/branches/3.4/src/transaction/__init__.py
M /ZODB/branches/3.4/src/transaction/_manager.py
M /ZODB/branches/3.4/src/transaction/_transaction.py
M /ZODB/branches/3.4/src/transaction/interfaces.py
A /ZODB/branches/3.4/src/transaction/savepoint.txt
A /ZODB/branches/3.4/src/transaction/tests/savepointsample.py
M /ZODB/branches/3.4/src/transaction/tests/test_register_compat.py
A /ZODB/branches/3.4/src/transaction/tests/test_savepoint.py
Added savepoints!
(And also added interfaces and rearranged some code to hopefully make
it easier to read.)
r30128 | jim | 2005-04-23 21:28:59 -0400 (Sat, 23 Apr 2005) | 2 lines
M /ZODB/branches/3.4/src/transaction/tests/test_transaction.py
Removed some tests that son't make sense after the savepoont refactoring
r30127 | jim | 2005-04-23 21:28:57 -0400 (Sat, 23 Apr 2005) | 2 lines
M /ZODB/branches/3.4/src/persistent/interfaces.py
Commented out mtime
Changed:
U ZODB/trunk/src/ZODB/Connection.py
D ZODB/trunk/src/ZODB/TmpStore.py
U ZODB/trunk/src/ZODB/tests/testConnection.py
A ZODB/trunk/src/ZODB/tests/testConnectionSavepoint.py
A ZODB/trunk/src/ZODB/tests/testConnectionSavepoint.txt
U ZODB/trunk/src/ZODB/tests/testZODB.py
U ZODB/trunk/src/ZODB/tests/test_datamanageradapter.py
U ZODB/trunk/src/persistent/interfaces.py
U ZODB/trunk/src/transaction/__init__.py
U ZODB/trunk/src/transaction/_manager.py
U ZODB/trunk/src/transaction/_transaction.py
U ZODB/trunk/src/transaction/interfaces.py
A ZODB/trunk/src/transaction/savepoint.txt
A ZODB/trunk/src/transaction/tests/savepointsample.py
U ZODB/trunk/src/transaction/tests/test_register_compat.py
A ZODB/trunk/src/transaction/tests/test_savepoint.py
U ZODB/trunk/src/transaction/tests/test_transaction.py
-=-
Modified: ZODB/trunk/src/ZODB/Connection.py
===================================================================
--- ZODB/trunk/src/ZODB/Connection.py 2005-04-24 04:26:04 UTC (rev 30135)
+++ ZODB/trunk/src/ZODB/Connection.py 2005-04-24 05:13:15 UTC (rev 30136)
@@ -17,6 +17,7 @@
import logging
import sys
+import tempfile
import threading
import warnings
from time import time
@@ -33,13 +34,12 @@
from ZODB.ConflictResolution import ResolvedSerial
from ZODB.ExportImport import ExportImport
-from ZODB.POSException \
- import ConflictError, ReadConflictError, InvalidObjectReference, \
- ConnectionStateError
-from ZODB.TmpStore import TmpStore
+from ZODB import POSException
+from ZODB.POSException import InvalidObjectReference, ConnectionStateError
+from ZODB.POSException import ConflictError, ReadConflictError
from ZODB.serialize import ObjectWriter, ObjectReader, myhasattr
-from ZODB.utils import u64, oid_repr, z64, positive_id, \
- DEPRECATED_ARGUMENT, deprecated36
+from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36
+from ZODB.utils import p64, u64, z64, oid_repr, positive_id
global_reset_counter = 0
@@ -61,17 +61,18 @@
implements(IConnection, IDataManager, IPersistentDataManager)
- _tmp = None
+ _storage = _normal_storage = _savepoint_storage = None
+
_code_timestamp = 0
- # ZODB.IConnection
+ ##########################################################################
+ # Connection methods, ZODB.IConnection
def __init__(self, version='', cache_size=400,
cache_deactivate_after=None, mvcc=True, txn_mgr=None,
synch=True):
"""Create a new Connection."""
self._log = logging.getLogger("ZODB.Connection")
- self._storage = None
self._debug_info = ()
self._opened = None # time.time() when DB.open() opened us
@@ -150,39 +151,6 @@
self.connections = None
- def get_connection(self, database_name):
- """Return a Connection for the named database."""
- connection = self.connections.get(database_name)
- if connection is None:
- new_con = self._db.databases[database_name].open()
- self.connections.update(new_con.connections)
- new_con.connections = self.connections
- connection = new_con
- return connection
-
- def get(self, oid):
- """Return the persistent object with oid 'oid'."""
- if self._storage is None:
- raise ConnectionStateError("The database connection is closed")
-
- obj = self._cache.get(oid, None)
- if obj is not None:
- return obj
- obj = self._added.get(oid, None)
- if obj is not None:
- return obj
-
- p, serial = self._storage.load(oid, self._version)
- obj = self._reader.getGhost(p)
-
- obj._p_oid = oid
- obj._p_jar = self
- obj._p_changed = None
- obj._p_serial = serial
-
- self._cache[oid] = obj
- return obj
-
def add(self, obj):
"""Add a new object 'obj' to the database and assign it an oid."""
if self._storage is None:
@@ -207,52 +175,41 @@
elif obj._p_jar is not self:
raise InvalidObjectReference(obj, obj._p_jar)
- def sortKey(self):
- """Return a consistent sort key for this connection."""
- return "%s:%s" % (self._storage.sortKey(), id(self))
+ def get(self, oid):
+ """Return the persistent object with oid 'oid'."""
+ if self._storage is None:
+ raise ConnectionStateError("The database connection is closed")
- def abort(self, transaction):
- """Abort a transaction and forget all changes."""
- for obj in self._registered_objects:
- oid = obj._p_oid
- assert oid is not None
- if oid in self._added:
- del self._added[oid]
- del obj._p_jar
- del obj._p_oid
- else:
+ obj = self._cache.get(oid, None)
+ if obj is not None:
+ return obj
+ obj = self._added.get(oid, None)
+ if obj is not None:
+ return obj
- # Note: If we invalidate a non-ghostifiable object
- # (i.e. a persistent class), the object will
- # immediately reread it's state. That means that the
- # following call could result in a call to
- # self.setstate, which, of course, must suceed.
- # In general, it would be better if the read could be
- # delayed until the start of the next transaction. If
- # we read at the end of a transaction and if the
- # object was invalidated during this transaction, then
- # we'll read non-current data, which we'll discard
- # later in transaction finalization. Unfortnately, we
- # can only delay the read if this abort corresponds to
- # a top-level-transaction abort. We can't tell if
- # this is a top-level-transaction abort, so we have to
- # go ahead and invalidate now. Fortunately, it's
- # pretty unlikely that the object we are invalidating
- # was invalidated by another thread, so the risk of a
- # reread is pretty low.
+ p, serial = self._storage.load(oid, self._version)
+ obj = self._reader.getGhost(p)
- self._cache.invalidate(oid)
+ obj._p_oid = oid
+ obj._p_jar = self
+ obj._p_changed = None
+ obj._p_serial = serial
- self._tpc_cleanup()
+ self._cache[oid] = obj
+ return obj
+ def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
+ """Deactivate all unmodified objects in the cache."""
+ if dt is not DEPRECATED_ARGUMENT:
+ deprecated36("cacheMinimize() dt= is ignored.")
+ self._cache.minimize()
+
# TODO: we should test what happens when cacheGC is called mid-transaction.
-
def cacheGC(self):
"""Reduce cache size to target size."""
self._cache.incrgc()
__onCloseCallbacks = None
-
def onCloseCallback(self, f):
"""Register a callable, f, to be called by close()."""
if self.__onCloseCallbacks is None:
@@ -266,12 +223,6 @@
raise ConnectionStateError("Cannot close a connection joined to "
"a transaction")
- if self._tmp is not None:
- # There are no direct modifications pending, but a subtransaction
- # is pending.
- raise ConnectionStateError("Cannot close a connection with a "
- "pending subtransaction")
-
if self._cache is not None:
self._cache.incrgc() # This is a good time to do some GC
@@ -285,7 +236,8 @@
self._log.error("Close callback failed for %s", f,
exc_info=sys.exc_info())
self.__onCloseCallbacks = None
- self._storage = self._tmp = self.new_oid = None
+ self._storage = self._savepoint_storage = self._normal_storage = None
+ self.new_oid = None
self._debug_info = ()
self._opened = None
# Return the connection to the pool.
@@ -297,10 +249,208 @@
# assert that here, because self may have been reused (by
# another thread) by the time we get back here.
- # transaction.interfaces.IDataManager
+ def db(self):
+ """Returns a handle to the database this connection belongs to."""
+ return self._db
+ def isReadOnly(self):
+ """Returns True if the storage for this connection is read only."""
+ if self._storage is None:
+ raise ConnectionStateError("The database connection is closed")
+ return self._storage.isReadOnly()
+
+ def invalidate(self, tid, oids):
+ """Notify the Connection that transaction 'tid' invalidated oids."""
+ self._inv_lock.acquire()
+ try:
+ if self._txn_time is None:
+ self._txn_time = tid
+ self._invalidated.update(oids)
+ finally:
+ self._inv_lock.release()
+
+ def root(self):
+ """Return the database root object."""
+ return self.get(z64)
+
+ def getVersion(self):
+ """Returns the version this connection is attached to."""
+ if self._storage is None:
+ raise ConnectionStateError("The database connection is closed")
+ return self._version
+
+ def get_connection(self, database_name):
+ """Return a Connection for the named database."""
+ connection = self.connections.get(database_name)
+ if connection is None:
+ new_con = self._db.databases[database_name].open()
+ self.connections.update(new_con.connections)
+ new_con.connections = self.connections
+ connection = new_con
+ return connection
+
+ def sync(self):
+ """Manually update the view on the database."""
+ self._txn_mgr.get().abort()
+ sync = getattr(self._storage, 'sync', 0)
+ if sync:
+ sync()
+ self._flush_invalidations()
+
+ def getDebugInfo(self):
+ """Returns a tuple with different items for debugging the
+ connection.
+ """
+ return self._debug_info
+
+ def setDebugInfo(self, *args):
+ """Add the given items to the debug information of this connection."""
+ self._debug_info = self._debug_info + args
+
+ def getTransferCounts(self, clear=False):
+ """Returns the number of objects loaded and stored."""
+ res = self._load_count, self._store_count
+ if clear:
+ self._load_count = 0
+ self._store_count = 0
+ return res
+
+ # Connection methods
+ ##########################################################################
+
+ ##########################################################################
+ # Data manager (IDataManager) methods
+
+ def abort(self, transaction):
+ """Abort a transaction and forget all changes."""
+
+ # The order is important here. We want to abort registered
+ # objects before we process the cache. Otherwise, we may un-add
+ # objects added in savepoints. If they've been modified since
+ # the savepoint, then they won't have _p_oid or _p_jar after
+ # they've been unadded. This will make the code in _abort
+ # confused.
+
+
+ self._abort()
+
+ if self._savepoint_storage is not None:
+ self._abort_savepoint()
+
+ self._tpc_cleanup()
+
+ def _abort(self):
+ """Abort a transaction and forget all changes."""
+
+ for obj in self._registered_objects:
+ oid = obj._p_oid
+ assert oid is not None
+ if oid in self._added:
+ del self._added[oid]
+ del obj._p_jar
+ del obj._p_oid
+ else:
+
+ # Note: If we invalidate a non-ghostifiable object
+ # (i.e. a persistent class), the object will
+ # immediately reread it's state. That means that the
+ # following call could result in a call to
+ # self.setstate, which, of course, must suceed.
+ # In general, it would be better if the read could be
+ # delayed until the start of the next transaction. If
+ # we read at the end of a transaction and if the
+ # object was invalidated during this transaction, then
+ # we'll read non-current data, which we'll discard
+ # later in transaction finalization. Unfortnately, we
+ # can only delay the read if this abort corresponds to
+ # a top-level-transaction abort. We can't tell if
+ # this is a top-level-transaction abort, so we have to
+ # go ahead and invalidate now. Fortunately, it's
+ # pretty unlikely that the object we are invalidating
+ # was invalidated by another thread, so the risk of a
+ # reread is pretty low.
+
+ self._cache.invalidate(oid)
+
+ def _tpc_cleanup(self):
+ """Performs cleanup operations to support tpc_finish and tpc_abort."""
+ self._conflicts.clear()
+ if not self._synch:
+ self._flush_invalidations()
+ self._needs_to_join = True
+ self._registered_objects = []
+
+ def _flush_invalidations(self):
+ self._inv_lock.acquire()
+ try:
+
+ # Non-ghostifiable objects may need to read when they are
+ # invalidated, so, we'll quickly just replace the
+ # invalidating dict with a new one. We'll then process
+ # the invalidations after freeing the lock *and* after
+ # reseting the time. This means that invalidations will
+ # happen after the start of the transactions. They are
+ # subject to conflict errors and to reading old data,
+
+ # TODO: There is a potential problem lurking for persistent
+ # classes. Suppose we have an invlidation of a persistent
+ # class and of an instance. If the instance is
+ # invalidated first and if the invalidation logic uses
+ # data read from the class, then the invalidation could
+ # be performed with state data. Or, suppose that there
+ # are instances of the class that are freed as a result of
+ # invalidating some object. Perhaps code in their __del__
+ # uses class data. Really, the only way to properly fix
+ # this is to, in fact, make classes ghostifiable. Then
+ # we'd have to reimplement attribute lookup to check the
+ # class state and, if necessary, activate the class. It's
+ # much worse than that though, because we'd also need to
+ # deal with slots. When a class is ghostified, we'd need
+ # to replace all of the slot operations with versions that
+ # reloaded the object when caled. It's hard to say which
+ # is better for worse. For now, it seems the risk of
+ # using a class while objects are being invalidated seems
+ # small enough t be acceptable.
+
+ invalidated = self._invalidated
+ self._invalidated = {}
+ self._txn_time = None
+ finally:
+ self._inv_lock.release()
+
+ self._cache.invalidate(invalidated)
+
+ # Now is a good time to collect some garbage.
+ self._cache.incrgc()
+
+ def tpc_begin(self, transaction):
+ """Begin commit of a transaction, starting the two-phase commit."""
+ self._modified = []
+
+ # _creating is a list of oids of new objects, which is used to
+ # remove them from the cache if a transaction aborts.
+ self._creating = []
+ self._normal_storage.tpc_begin(transaction)
+
def commit(self, transaction):
"""Commit changes to an object"""
+
+ if self._savepoint_storage is not None:
+
+ # We first checkpoint the current changes to the savepoint
+ self.savepoint()
+
+ # then commit all of the savepoint changes at once
+ self._commit_savepoint(transaction)
+
+ # No need to call _commit since savepoint did.
+
+ else:
+ self._commit(transaction)
+
+ def _commit(self, transaction):
+ """Commit changes to an object"""
+
if self._import:
# TODO: This code seems important for Zope, but needs docs
# to explain why.
@@ -377,169 +527,6 @@
self._handle_serial(s, oid)
- def commit_sub(self, t):
- """Commit all changes made in subtransactions and begin 2-phase commit
- """
- if self._tmp is None:
- return
- src = self._storage
- self._storage = self._tmp
- self._tmp = None
-
- self._log.debug("Commiting subtransaction of size %s", src.getSize())
- oids = src._index.keys()
- self._storage.tpc_begin(t)
-
- # Copy invalidating and creating info from temporary storage:
- self._modified.extend(oids)
- self._creating.extend(src._creating)
-
- for oid in oids:
- data, serial = src.load(oid, src)
- s = self._storage.store(oid, serial, data, self._version, t)
- self._handle_serial(s, oid, change=False)
-
- def abort_sub(self, t):
- """Discard all subtransaction data."""
- if self._tmp is None:
- return
- src = self._storage
- self._storage = self._tmp
- self._tmp = None
-
- # Note: If we invalidate a non-ghostifiable object (i.e. a
- # persistent class), the object will immediately reread it's
- # state. That means that the following call could result in a
- # call to self.setstate, which, of course, must succeed. In
- # general, it would be better if the read could be delayed
- # until the start of the next transaction. If we read at the
- # end of a transaction and if the object was invalidated
- # during this transaction, then we'll read non-current data,
- # which we'll discard later in transaction finalization. We
- # could, theoretically queue this invalidation by calling
- # self.invalidate. Unfortunately, attempts to make that
- # change resulted in mysterious test failures. It's pretty
- # unlikely that the object we are invalidating was invalidated
- # by another thread, so the risk of a reread is pretty low.
- # It's really not worth the effort to pursue this.
-
- self._cache.invalidate(src._index.keys())
- self._invalidate_creating(src._creating)
-
- def _invalidate_creating(self, creating=None):
- """Disown any objects newly saved in an uncommitted transaction."""
- if creating is None:
- creating = self._creating
- self._creating = []
-
- for oid in creating:
- o = self._cache.get(oid)
- if o is not None:
- del self._cache[oid]
- del o._p_jar
- del o._p_oid
-
- # The next two methods are callbacks for transaction synchronization.
-
- def beforeCompletion(self, txn):
- # We don't do anything before a commit starts.
- pass
-
- def afterCompletion(self, txn):
- self._flush_invalidations()
-
- def _flush_invalidations(self):
- self._inv_lock.acquire()
- try:
-
- # Non-ghostifiable objects may need to read when they are
- # invalidated, so, we'll quickly just replace the
- # invalidating dict with a new one. We'll then process
- # the invalidations after freeing the lock *and* after
- # reseting the time. This means that invalidations will
- # happen after the start of the transactions. They are
- # subject to conflict errors and to reading old data,
-
- # TODO: There is a potential problem lurking for persistent
- # classes. Suppose we have an invlidation of a persistent
- # class and of an instance. If the instance is
- # invalidated first and if the invalidation logic uses
- # data read from the class, then the invalidation could
- # be performed with state data. Or, suppose that there
- # are instances of the class that are freed as a result of
- # invalidating some object. Perhaps code in their __del__
- # uses class data. Really, the only way to properly fix
- # this is to, in fact, make classes ghostifiable. Then
- # we'd have to reimplement attribute lookup to check the
- # class state and, if necessary, activate the class. It's
- # much worse than that though, because we'd also need to
- # deal with slots. When a class is ghostified, we'd need
- # to replace all of the slot operations with versions that
- # reloaded the object when caled. It's hard to say which
- # is better for worse. For now, it seems the risk of
- # using a class while objects are being invalidated seems
- # small enough t be acceptable.
-
- invalidated = self._invalidated
- self._invalidated = {}
- self._txn_time = None
- finally:
- self._inv_lock.release()
-
- self._cache.invalidate(invalidated)
-
- # Now is a good time to collect some garbage.
- self._cache.incrgc()
-
- def root(self):
- """Return the database root object."""
- return self.get(z64)
-
- def db(self):
- """Returns a handle to the database this connection belongs to."""
- return self._db
-
- def isReadOnly(self):
- """Returns True if the storage for this connection is read only."""
- if self._storage is None:
- raise ConnectionStateError("The database connection is closed")
- return self._storage.isReadOnly()
-
- def invalidate(self, tid, oids):
- """Notify the Connection that transaction 'tid' invalidated oids."""
- self._inv_lock.acquire()
- try:
- if self._txn_time is None:
- self._txn_time = tid
- self._invalidated.update(oids)
- finally:
- self._inv_lock.release()
-
- # IDataManager
-
- def tpc_begin(self, transaction, sub=False):
- """Begin commit of a transaction, starting the two-phase commit."""
- self._modified = []
-
- # _creating is a list of oids of new objects, which is used to
- # remove them from the cache if a transaction aborts.
- self._creating = []
- if sub and self._tmp is None:
- # Sub-transaction!
- self._tmp = self._storage
- self._storage = TmpStore(self._version, self._storage)
-
- self._storage.tpc_begin(transaction)
-
- def tpc_vote(self, transaction):
- """Verify that a data manager can commit the transaction."""
- try:
- vote = self._storage.tpc_vote
- except AttributeError:
- return
- s = vote(transaction)
- self._handle_serial(s)
-
def _handle_serial(self, store_return, oid=None, change=1):
"""Handle the returns from store() and tpc_vote() calls."""
@@ -582,26 +569,13 @@
obj._p_changed = 0 # transition from changed to up-to-date
obj._p_serial = serial
- def tpc_finish(self, transaction):
- """Indicate confirmation that the transaction is done."""
- if self._tmp is not None:
- # Commiting a subtransaction!
- # There is no need to invalidate anything.
- self._storage.tpc_finish(transaction)
- self._storage._creating[:0]=self._creating
- del self._creating[:]
- else:
- def callback(tid):
- d = {}
- for oid in self._modified:
- d[oid] = 1
- self._db.invalidate(tid, d, self)
- self._storage.tpc_finish(transaction, callback)
- self._tpc_cleanup()
-
def tpc_abort(self, transaction):
if self._import:
self._import = None
+
+ if self._savepoint_storage is not None:
+ self._abort_savepoint()
+
self._storage.tpc_abort(transaction)
# Note: If we invalidate a non-justifiable object (i.e. a
@@ -628,41 +602,59 @@
del obj._p_jar
self._tpc_cleanup()
- def _tpc_cleanup(self):
- """Performs cleanup operations to support tpc_finish and tpc_abort."""
- self._conflicts.clear()
- if not self._synch:
- self._flush_invalidations()
- self._needs_to_join = True
- self._registered_objects = []
+ def _invalidate_creating(self, creating=None):
+ """Disown any objects newly saved in an uncommitted transaction."""
+ if creating is None:
+ creating = self._creating
+ self._creating = []
- def sync(self):
- """Manually update the view on the database."""
- self._txn_mgr.get().abort()
- sync = getattr(self._storage, 'sync', 0)
- if sync:
- sync()
- self._flush_invalidations()
+ for oid in creating:
+ o = self._cache.get(oid)
+ if o is not None:
+ del self._cache[oid]
+ del o._p_jar
+ del o._p_oid
- def getDebugInfo(self):
- """Returns a tuple with different items for debugging the
- connection.
- """
- return self._debug_info
+ def tpc_vote(self, transaction):
+ """Verify that a data manager can commit the transaction."""
+ try:
+ vote = self._storage.tpc_vote
+ except AttributeError:
+ return
+ s = vote(transaction)
+ self._handle_serial(s)
- def setDebugInfo(self, *args):
- """Add the given items to the debug information of this connection."""
- self._debug_info = self._debug_info + args
+ def tpc_finish(self, transaction):
+ """Indicate confirmation that the transaction is done."""
+ def callback(tid):
+ d = {}
+ for oid in self._modified:
+ d[oid] = 1
+ self._db.invalidate(tid, d, self)
+ self._storage.tpc_finish(transaction, callback)
+ self._tpc_cleanup()
- def getTransferCounts(self, clear=False):
- """Returns the number of objects loaded and stored."""
- res = self._load_count, self._store_count
- if clear:
- self._load_count = 0
- self._store_count = 0
- return res
+ def sortKey(self):
+ """Return a consistent sort key for this connection."""
+ return "%s:%s" % (self._storage.sortKey(), id(self))
- ##############################################
+ # Data manager (IDataManager) methods
+ ##########################################################################
+
+ ##########################################################################
+ # Transaction-manager synchronization -- ISynchronizer
+
+ def beforeCompletion(self, txn):
+ # We don't do anything before a commit starts.
+ pass
+
+ def afterCompletion(self, txn):
+ self._flush_invalidations()
+
+ # Transaction-manager synchronization -- ISynchronizer
+ ##########################################################################
+
+ ##########################################################################
# persistent.interfaces.IPersistentDatamanager
def oldstate(self, obj, tid):
@@ -818,12 +810,24 @@
self._register(obj)
def _register(self, obj=None):
- if obj is not None:
- self._registered_objects.append(obj)
+
+ # The order here is important. We need to join before
+ # registering the object, because joining may take a
+ # savepoint, and the savepoint should not reflect the change
+ # to the object.
+
if self._needs_to_join:
self._txn_mgr.get().join(self)
self._needs_to_join = False
+ if obj is not None:
+ self._registered_objects.append(obj)
+
+
+ # persistent.interfaces.IPersistentDatamanager
+ ##########################################################################
+
+ ##########################################################################
# PROTECTED stuff (used by e.g. ZODB.DB.DB)
def _cache_items(self):
@@ -862,7 +866,7 @@
# and Storage.
self._db = odb
- self._storage = odb._storage
+ self._normal_storage = self._storage = odb._storage
self.new_oid = odb._storage.new_oid
self._opened = time()
if synch is not None:
@@ -892,6 +896,7 @@
cache_size = self._cache.cache_size
self._cache = cache = PickleCache(self, cache_size)
+ ##########################################################################
# Python protocol
def __repr__(self):
@@ -901,6 +906,10 @@
ver = ''
return '<Connection at %08x%s>' % (positive_id(self), ver)
+ # Python protocol
+ ##########################################################################
+
+ ##########################################################################
# DEPRECATION candidates
__getitem__ = get
@@ -916,33 +925,6 @@
except KeyError:
return self.getVersion()
- def getVersion(self):
- """Returns the version this connection is attached to."""
- if self._storage is None:
- raise ConnectionStateError("The database connection is closed")
- return self._version
-
- def setklassstate(self, obj):
- # Special case code to handle ZClasses, I think.
- # Called the cache when an object of type type is invalidated.
- try:
- oid = obj._p_oid
- p, serial = self._storage.load(oid, self._version)
-
- # We call getGhost(), but we actually get a non-ghost back.
- # The object is a class, which can't actually be ghosted.
- copy = self._reader.getGhost(p)
- obj.__dict__.clear()
- obj.__dict__.update(copy.__dict__)
-
- obj._p_oid = oid
- obj._p_jar = self
- obj._p_changed = 0
- obj._p_serial = serial
- except:
- self._log.error("setklassstate failed", exc_info=sys.exc_info())
- raise
-
def exchange(self, old, new):
# called by a ZClasses method that isn't executed by the test suite
oid = old._p_oid
@@ -952,8 +934,20 @@
self._register(new)
self._cache[oid] = new
+ # DEPRECATION candidates
+ ##########################################################################
+
+ ##########################################################################
# DEPRECATED methods
+ def cacheFullSweep(self, dt=None):
+ deprecated36("cacheFullSweep is deprecated. "
+ "Use cacheMinimize instead.")
+ if dt is None:
+ self._cache.full_sweep()
+ else:
+ self._cache.full_sweep(dt)
+
def getTransaction(self):
"""Get the current transaction for this connection.
@@ -986,16 +980,151 @@
self._txn_mgr.registerSynch(self)
return self._txn_mgr
- def cacheFullSweep(self, dt=None):
- deprecated36("cacheFullSweep is deprecated. "
- "Use cacheMinimize instead.")
- if dt is None:
- self._cache.full_sweep()
- else:
- self._cache.full_sweep(dt)
+ # DEPRECATED methods
+ ##########################################################################
- def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
- """Deactivate all unmodified objects in the cache."""
- if dt is not DEPRECATED_ARGUMENT:
- deprecated36("cacheMinimize() dt= is ignored.")
- self._cache.minimize()
+ #####################################################################
+ # Savepoint support
+
+ def savepoint(self):
+ if self._savepoint_storage is None:
+ self._savepoint_storage = TmpStore(self._version,
+ self._normal_storage)
+ self._storage = self._savepoint_storage
+
+ self._creating = []
+ self._commit(None)
+ self._storage.creating.extend(self._creating)
+ del self._creating[:]
+ self._registered_objects = []
+
+ state = self._storage.position, self._storage.index.copy()
+ return Savepoint(self, state)
+
+ def _rollback(self, state):
+ self._abort()
+ self._registered_objects = []
+ src = self._storage
+ self._cache.invalidate(src.index)
+ src.reset(*state)
+
+ def _commit_savepoint(self, transaction):
+ """Commit all changes made in subtransactions and begin 2-phase commit
+ """
+ src = self._savepoint_storage
+ self._storage = self._normal_storage
+ self._savepoint_storage = None
+
+ self._log.debug("Commiting savepoints of size %s", src.getSize())
+ oids = src.index.keys()
+
+ # Copy invalidating and creating info from temporary storage:
+ self._modified.extend(oids)
+ self._creating.extend(src.creating)
+
+ for oid in oids:
+ data, serial = src.load(oid, src)
+ s = self._storage.store(oid, serial, data,
+ self._version, transaction)
+ self._handle_serial(s, oid, change=False)
+
+ src.close()
+
+ def _abort_savepoint(self):
+ """Discard all subtransaction data."""
+ src = self._savepoint_storage
+ self._storage = self._normal_storage
+ self._savepoint_storage = None
+
+ # Note: If we invalidate a non-ghostifiable object (i.e. a
+ # persistent class), the object will immediately reread it's
+ # state. That means that the following call could result in a
+ # call to self.setstate, which, of course, must succeed. In
+ # general, it would be better if the read could be delayed
+ # until the start of the next transaction. If we read at the
+ # end of a transaction and if the object was invalidated
+ # during this transaction, then we'll read non-current data,
+ # which we'll discard later in transaction finalization. We
+ # could, theoretically queue this invalidation by calling
+ # self.invalidate. Unfortunately, attempts to make that
+ # change resulted in mysterious test failures. It's pretty
+ # unlikely that the object we are invalidating was invalidated
+ # by another thread, so the risk of a reread is pretty low.
+ # It's really not worth the effort to pursue this.
+
+ self._cache.invalidate(src.index)
+ self._invalidate_creating(src.creating)
+ src.close()
+
+ # Savepoint support
+ #####################################################################
+
+class Savepoint:
+
+ def __init__(self, datamanager, state):
+ self.datamanager = datamanager
+ self.state = state
+
+ def rollback(self):
+ self.datamanager._rollback(self.state)
+
+class TmpStore:
+ """A storage-like thing to support savepoints."""
+
+ def __init__(self, base_version, storage):
+ self._storage = storage
+ for method in (
+ 'getName', 'new_oid', 'modifiedInVersion', 'getSize',
+ 'undoLog', 'versionEmpty', 'sortKey',
+ ):
+ setattr(self, method, getattr(storage, method))
+
+ self._base_version = base_version
+ self._file = tempfile.TemporaryFile()
+ # position: current file position
+ # _tpos: file position at last commit point
+ self.position = 0L
+ # index: map oid to pos of last committed version
+ self.index = {}
+ self.creating = []
+
+ def __len__(self):
+ return len(self.index)
+
+ def close(self):
+ self._file.close()
+
+ def load(self, oid, version):
+ pos = self.index.get(oid)
+ if pos is None:
+ return self._storage.load(oid, self._base_version)
+ self._file.seek(pos)
+ h = self._file.read(8)
+ oidlen = u64(h)
+ read_oid = self._file.read(oidlen)
+ if read_oid != oid:
+ raise POSException.StorageSystemError('Bad temporary storage')
+ h = self._file.read(16)
+ size = u64(h[8:])
+ serial = h[:8]
+ return self._file.read(size), serial
+
+ def store(self, oid, serial, data, version, transaction):
+ # we have this funny signature so we can reuse the normal non-commit
+ # commit logic
+ assert version == self._base_version
+ self._file.seek(self.position)
+ l = len(data)
+ if serial is None:
+ serial = z64
+ header = p64(len(oid)) + oid + serial + p64(l)
+ self._file.write(header)
+ self._file.write(data)
+ self.index[oid] = self.position
+ self.position += l + len(header)
+ return serial
+
+ def reset(self, position, index):
+ self._file.truncate(position)
+ self.position = position
+ self.index = index
Deleted: ZODB/trunk/src/ZODB/TmpStore.py
===================================================================
--- ZODB/trunk/src/ZODB/TmpStore.py 2005-04-24 04:26:04 UTC (rev 30135)
+++ ZODB/trunk/src/ZODB/TmpStore.py 2005-04-24 05:13:15 UTC (rev 30136)
@@ -1,126 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE
-#
-##############################################################################
-
-from ZODB import POSException
-from ZODB.utils import p64, u64, z64
-
-import tempfile
-
-class TmpStore:
- """A storage to support subtransactions."""
-
- _bver = ''
-
- def __init__(self, base_version, storage):
- self._transaction = None
- self._storage = storage
- if base_version:
- self._bver = base_version
- self._file = tempfile.TemporaryFile()
- # _pos: current file position
- # _tpos: file position at last commit point
- self._pos = self._tpos = 0L
- # _index: map oid to pos of last committed version
- self._index = {}
- # _tindex: map oid to pos for new updates
- self._tindex = {}
- self._creating = []
-
- def close(self):
- self._file.close()
-
- def getName(self):
- return self._storage.getName()
-
- def getSize(self):
- return self._pos
-
- def load(self, oid, version):
- pos = self._index.get(oid)
- if pos is None:
- return self._storage.load(oid, self._bver)
- self._file.seek(pos)
- h = self._file.read(8)
- oidlen = u64(h)
- read_oid = self._file.read(oidlen)
- if read_oid != oid:
- raise POSException.StorageSystemError('Bad temporary storage')
- h = self._file.read(16)
- size = u64(h[8:])
- serial = h[:8]
- return self._file.read(size), serial
-
- def sortKey(self):
- return self._storage.sortKey()
-
- # TODO: clarify difference between self._storage & self._db._storage
-
- def modifiedInVersion(self, oid):
- if self._index.has_key(oid):
- return self._bver
- return self._storage.modifiedInVersion(oid)
-
- def new_oid(self):
- return self._storage.new_oid()
-
- def registerDB(self, db, limit):
- pass
-
- def store(self, oid, serial, data, version, transaction):
- if transaction is not self._transaction:
- raise POSException.StorageTransactionError(self, transaction)
- self._file.seek(self._pos)
- l = len(data)
- if serial is None:
- serial = z64
- header = p64(len(oid)) + oid + serial + p64(l)
- self._file.write(header)
- self._file.write(data)
- self._tindex[oid] = self._pos
- self._pos += l + len(header)
- return serial
-
- def tpc_abort(self, transaction):
- if transaction is not self._transaction:
- return
- self._tindex.clear()
- self._transaction = None
- self._pos = self._tpos
-
- def tpc_begin(self, transaction):
- if self._transaction is transaction:
- return
- self._transaction = transaction
- self._tindex.clear() # Just to be sure!
- self._pos = self._tpos
-
- def tpc_vote(self, transaction):
- pass
-
- def tpc_finish(self, transaction, f=None):
- if transaction is not self._transaction:
- return
- if f is not None:
- f()
- self._index.update(self._tindex)
- self._tindex.clear()
- self._tpos = self._pos
-
- def undoLog(self, first, last, filter=None):
- return ()
-
- def versionEmpty(self, version):
- # TODO: what is this supposed to do?
- if version == self._bver:
- return len(self._index)
Modified: ZODB/trunk/src/ZODB/tests/testConnection.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testConnection.py 2005-04-24 04:26:04 UTC (rev 30135)
+++ ZODB/trunk/src/ZODB/tests/testConnection.py 2005-04-24 05:13:15 UTC (rev 30136)
@@ -294,7 +294,7 @@
>>> cn.close() # this was succeeding
Traceback (most recent call last):
...
- ConnectionStateError: Cannot close a connection with a pending subtransaction
+ ConnectionStateError: Cannot close a connection joined to a transaction
Again this leaves the connection as it was.
>>> transaction.commit()
Copied: ZODB/trunk/src/ZODB/tests/testConnectionSavepoint.py (from rev 30131, ZODB/branches/3.4/src/ZODB/tests/testConnectionSavepoint.py)
Property changes on: ZODB/trunk/src/ZODB/tests/testConnectionSavepoint.py
___________________________________________________________________
Name: svn:keywords
+ Id
Name: svn:eol-style
+ native
Copied: ZODB/trunk/src/ZODB/tests/testConnectionSavepoint.txt (from rev 30131, ZODB/branches/3.4/src/ZODB/tests/testConnectionSavepoint.txt)
Property changes on: ZODB/trunk/src/ZODB/tests/testConnectionSavepoint.txt
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: ZODB/trunk/src/ZODB/tests/testZODB.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testZODB.py 2005-04-24 04:26:04 UTC (rev 30135)
+++ ZODB/trunk/src/ZODB/tests/testZODB.py 2005-04-24 05:13:15 UTC (rev 30136)
@@ -550,13 +550,17 @@
self.assertEqual(rt['a'], 1)
rt['b'] = 2
+
# Subtransactions don't do tpc_vote, so we poison tpc_begin.
- poisoned = PoisonedObject(PoisonedJar(break_tpc_begin=True))
- transaction.get().register(poisoned)
+ poisoned = PoisonedJar()
+ transaction.get().join(poisoned)
+ poisoned.break_savepoint = True
self.assertRaises(PoisonedError, transaction.get().commit, True)
# Trying to subtxn-commit again fails too.
- self.assertRaises(TransactionFailedError, transaction.get().commit, True)
- self.assertRaises(TransactionFailedError, transaction.get().commit, True)
+ self.assertRaises(TransactionFailedError,
+ transaction.get().commit, True)
+ self.assertRaises(TransactionFailedError,
+ transaction.get().commit, True)
# Top-level commit also fails.
self.assertRaises(TransactionFailedError, transaction.get().commit)
@@ -568,6 +572,7 @@
# also raises TransactionFailedError.
self.assertRaises(TransactionFailedError, rt.__setitem__, 'b', 2)
+
# Clean up via abort(), and try again.
transaction.get().abort()
rt['a'] = 1
@@ -576,13 +581,18 @@
# Cleaning up via begin() should also work.
rt['a'] = 2
- transaction.get().register(poisoned)
+ poisoned = PoisonedJar()
+ transaction.get().join(poisoned)
+ poisoned.break_savepoint = True
self.assertRaises(PoisonedError, transaction.get().commit, True)
- self.assertRaises(TransactionFailedError, transaction.get().commit, True)
+ self.assertRaises(TransactionFailedError,
+ transaction.get().commit, True)
+
# The change to rt['a'] is lost.
self.assertEqual(rt['a'], 1)
# Trying to modify an object also fails.
self.assertRaises(TransactionFailedError, rt.__setitem__, 'b', 2)
+
# Clean up via begin(), and try again.
transaction.begin()
rt['a'] = 2
@@ -603,9 +613,11 @@
# PoisonedJar arranges to raise exceptions from interesting places.
# For whatever reason, subtransaction commits don't call tpc_vote.
class PoisonedJar:
- def __init__(self, break_tpc_begin=False, break_tpc_vote=False):
+ def __init__(self, break_tpc_begin=False, break_tpc_vote=False,
+ break_savepoint=False):
self.break_tpc_begin = break_tpc_begin
self.break_tpc_vote = break_tpc_vote
+ self.break_savepoint = break_savepoint
def sortKey(self):
return str(id(self))
@@ -620,14 +632,10 @@
if self.break_tpc_vote:
raise PoisonedError("tpc_vote fails")
- # commit_sub is needed else this jar is ignored during subtransaction
- # commit.
- def commit_sub(*args):
- pass
+ def savepoint(self):
+ if self.break_savepoint:
+ raise PoisonedError("savepoint fails")
- def abort_sub(*args):
- pass
-
def commit(*args):
pass
Modified: ZODB/trunk/src/ZODB/tests/test_datamanageradapter.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/test_datamanageradapter.py 2005-04-24 04:26:04 UTC (rev 30135)
+++ ZODB/trunk/src/ZODB/tests/test_datamanageradapter.py 2005-04-24 05:13:15 UTC (rev 30136)
@@ -238,569 +238,6 @@
False
"""
-def test_commit_w_subtransactions():
- """
- So, we have a data manager:
-
- >>> dm = DataManager()
-
- and we do some work that modifies uncommited state:
-
- >>> dm.inc()
- >>> dm.state, dm.delta
- (0, 1)
-
- Now we'll commit the changes in a subtransaction. When the data
- manager joins a transaction, the transaction will create an
- adapter.
-
- >>> dma = DataManagerAdapter(dm)
-
- and register it as a modified object. At commit time, the
- transaction will get the "jar" like this:
-
- >>> jar = getattr(dma, '_p_jar', dma)
-
- and, of course, the jar and the adapter will be the same:
-
- >>> jar is dma
- True
-
- The transaction will call tpc_begin:
-
- >>> t1 = '1'
- >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
-
- Then the transaction will call commit on the jar:
-
- >>> jar.commit(t1)
-
- This doesn't actually do anything. :)
-
- >>> dm.state, dm.delta
- (0, 1)
-
- The transaction will then call tpc_vote:
-
- >>> jar.tpc_vote(t1)
-
- This doesn't do anything either, because zodb4 data managers don't
- actually do two-phase commit for subtransactions.
-
- >>> dm.state, dm.delta
- (0, 1)
-
- Finally, we call tpc_finish. This does actally create a savepoint,
- but we can't really tell that from outside.
-
- >>> jar.tpc_finish(t1)
- >>> dm.state, dm.delta
- (0, 1)
-
- We'll do more of the above:
-
- >>> dm.inc()
- >>> dm.state, dm.delta
- (0, 2)
- >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
- >>> jar.commit(t1)
- >>> jar.tpc_vote(t1)
- >>> jar.tpc_finish(t1)
- >>> dm.state, dm.delta
- (0, 2)
- >>> dm.inc()
- >>> dm.state, dm.delta
- (0, 3)
- >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
- >>> jar.commit(t1)
- >>> jar.tpc_vote(t1)
- >>> jar.tpc_finish(t1)
- >>> dm.state, dm.delta
- (0, 3)
-
- Note that the bove works *because* the same transaction is used
- for each subtransaction.
-
- Finally, we'll do a little more work:
-
- >>> dm.inc()
- >>> dm.inc()
- >>> dm.state, dm.delta
- (0, 5)
-
- and then commit the top-level transaction.
-
- The transaction will actually go through the steps for a subtransaction:
-
- >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
- >>> jar.commit(t1)
- >>> jar.tpc_vote(t1)
- >>> jar.tpc_finish(t1)
-
- And then call commit_sub:
-
- >>> jar.commit_sub(t1)
-
- As usual, this doesn't actually do anything. ;)
-
- >>> dm.state, dm.delta
- (0, 5)
-
- The transaction manager doesn's call tpc_begin, because commit_sub
- implies the start of two-phase commit. Next, it does call commit:
-
- >>> jar.commit(t1)
-
- which doesn't do anything.
-
- Finally, the transaction calls tpc_vote:
-
- >>> jar.tpc_vote(t1)
-
- which actually does something (because this is the top-level txn):
-
- >>> dm.state, dm.delta
- (5, 5)
- >>> dm.prepared
- True
-
- Finally, tpc_finish is called:
-
- >>> jar.tpc_finish(t1)
-
- and the data manager finishes the two-phase commit:
-
- >>> dm.state, dm.delta
- (5, 0)
- >>> dm.prepared
- False
- """
-
-def test_commit_w_subtransactions_featuring_subtransaction_abort():
- """
- So, we have a data manager:
-
- >>> dm = DataManager()
-
- and we do some work that modifies uncommited state:
-
- >>> dm.inc()
- >>> dm.state, dm.delta
- (0, 1)
-
- Now we'll commit the changes in a subtransaction. When the data
- manager joins a transaction, the transaction will create an
- adapter.
-
- >>> dma = DataManagerAdapter(dm)
-
- and register it as a modified object. At commit time, the
- transaction will get the "jar" like this:
-
- >>> jar = getattr(dma, '_p_jar', dma)
-
- and, of course, the jar and the adapter will be the same:
-
- >>> jar is dma
- True
-
- The transaction will call tpc_begin:
-
- >>> t1 = '1'
- >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
-
- Then the transaction will call commit on the jar:
-
- >>> jar.commit(t1)
-
- This doesn't actually do anything. :)
-
- >>> dm.state, dm.delta
- (0, 1)
-
- The transaction will then call tpc_vote:
-
- >>> jar.tpc_vote(t1)
-
- This doesn't do anything either, because zodb4 data managers don't
- actually do two-phase commit for subtransactions.
-
- >>> dm.state, dm.delta
- (0, 1)
-
- Finally, we call tpc_finish. This does actally create a savepoint,
- but we can't really tell that from outside.
-
- >>> jar.tpc_finish(t1)
- >>> dm.state, dm.delta
- (0, 1)
-
- We'll do more of the above:
-
- >>> dm.inc()
- >>> dm.state, dm.delta
- (0, 2)
- >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
- >>> jar.commit(t1)
- >>> jar.tpc_vote(t1)
- >>> jar.tpc_finish(t1)
- >>> dm.state, dm.delta
- (0, 2)
-
-
- >>> dm.inc()
- >>> dm.state, dm.delta
- (0, 3)
-
- But then we'll decide to abort a subtransaction.
-
- The transaction will just call abort as usual:
-
- >>> jar.abort(t1)
-
- This will cause a rollback to the last savepoint:
- >>> dm.state, dm.delta
- (0, 2)
-
- Then we do more work:
-
- >>> dm.inc()
- >>> dm.state, dm.delta
- (0, 3)
- >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
- >>> jar.commit(t1)
- >>> jar.tpc_vote(t1)
- >>> jar.tpc_finish(t1)
- >>> dm.state, dm.delta
- (0, 3)
-
- Note that the bove works *because* the same transaction is used
- for each subtransaction.
-
- Finally, we'll do a little more work:
-
- >>> dm.inc()
- >>> dm.inc()
- >>> dm.state, dm.delta
- (0, 5)
-
- and then commit the top-level transaction.
-
- The transaction will actually go through the steps for a subtransaction:
-
- >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
- >>> jar.commit(t1)
- >>> jar.tpc_vote(t1)
- >>> jar.tpc_finish(t1)
-
- And then call commit_sub:
-
- >>> jar.commit_sub(t1)
-
- As usual, this doesn't actually do anything. ;)
-
- >>> dm.state, dm.delta
- (0, 5)
-
- The transaction manager doesn's call tpc_begin, because commit_sub
- implies the start of two-phase commit. Next, it does call commit:
-
- >>> jar.commit(t1)
-
- which doesn't do anything.
-
- Finally, the transaction calls tpc_vote:
-
- >>> jar.tpc_vote(t1)
-
- which actually does something (because this is the top-level txn):
-
- >>> dm.state, dm.delta
- (5, 5)
- >>> dm.prepared
- True
-
- Finally, tpc_finish is called:
-
- >>> jar.tpc_finish(t1)
-
- and the data manager finishes the two-phase commit:
-
- >>> dm.state, dm.delta
- (5, 0)
- >>> dm.prepared
- False
- """
-
-def test_abort_w_subtransactions():
- """
- So, we have a data manager:
-
- >>> dm = DataManager()
-
- and we do some work that modifies uncommited state:
-
- >>> dm.inc()
- >>> dm.state, dm.delta
- (0, 1)
-
- Now we'll commit the changes in a subtransaction. When the data
- manager joins a transaction, the transaction will create an
- adapter.
-
- >>> dma = DataManagerAdapter(dm)
-
- and register it as a modified object. At commit time, the
- transaction will get the "jar" like this:
-
- >>> jar = getattr(dma, '_p_jar', dma)
-
- and, of course, the jar and the adapter will be the same:
-
- >>> jar is dma
- True
-
- The transaction will call tpc_begin:
-
- >>> t1 = '1'
- >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
-
- Then the transaction will call commit on the jar:
-
- >>> jar.commit(t1)
-
- This doesn't actually do anything. :)
-
- >>> dm.state, dm.delta
- (0, 1)
-
- The transaction will then call tpc_vote:
-
- >>> jar.tpc_vote(t1)
-
- This doesn't do anything either, because zodb4 data managers don't
- actually do two-phase commit for subtransactions.
-
- >>> dm.state, dm.delta
- (0, 1)
-
- Finally, we call tpc_finish. This does actally create a savepoint,
- but we can't really tell that from outside.
-
- >>> jar.tpc_finish(t1)
- >>> dm.state, dm.delta
- (0, 1)
-
- We'll do more of the above:
-
- >>> dm.inc()
- >>> dm.state, dm.delta
- (0, 2)
- >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
- >>> jar.commit(t1)
- >>> jar.tpc_vote(t1)
- >>> jar.tpc_finish(t1)
- >>> dm.state, dm.delta
- (0, 2)
- >>> dm.inc()
- >>> dm.state, dm.delta
- (0, 3)
- >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
- >>> jar.commit(t1)
- >>> jar.tpc_vote(t1)
- >>> jar.tpc_finish(t1)
- >>> dm.state, dm.delta
- (0, 3)
-
- Note that the bove works *because* the same transaction is used
- for each subtransaction.
-
- Finally, we'll do a little more work:
-
- >>> dm.inc()
- >>> dm.inc()
- >>> dm.state, dm.delta
- (0, 5)
-
- and then abort the top-level transaction.
-
- The transaction first call abort on the jar:
-
- >>> jar.abort(t1)
-
- This will have the effect of aborting the subtrancation:
-
- >>> dm.state, dm.delta
- (0, 3)
-
- Then the transaction will call abort_sub:
-
- >>> jar.abort_sub(t1)
-
- This will abort all of the subtransactions:
-
- >>> dm.state, dm.delta
- (0, 0)
- """
-
-
-def test_tpc_abort_w_subtransactions_featuring_subtransaction_abort():
- """
- So, we have a data manager:
-
- >>> dm = DataManager()
-
- and we do some work that modifies uncommited state:
-
- >>> dm.inc()
- >>> dm.state, dm.delta
- (0, 1)
-
- Now we'll commit the changes in a subtransaction. When the data
- manager joins a transaction, the transaction will create an
- adapter.
-
- >>> dma = DataManagerAdapter(dm)
-
- and register it as a modified object. At commit time, the
- transaction will get the "jar" like this:
-
- >>> jar = getattr(dma, '_p_jar', dma)
-
- and, of course, the jar and the adapter will be the same:
-
- >>> jar is dma
- True
-
- The transaction will call tpc_begin:
-
- >>> t1 = '1'
- >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
-
- Then the transaction will call commit on the jar:
-
- >>> jar.commit(t1)
-
- This doesn't actually do anything. :)
-
- >>> dm.state, dm.delta
- (0, 1)
-
- The transaction will then call tpc_vote:
-
- >>> jar.tpc_vote(t1)
-
- This doesn't do anything either, because zodb4 data managers don't
- actually do two-phase commit for subtransactions.
-
- >>> dm.state, dm.delta
- (0, 1)
-
- Finally, we call tpc_finish. This does actally create a savepoint,
- but we can't really tell that from outside.
-
- >>> jar.tpc_finish(t1)
- >>> dm.state, dm.delta
- (0, 1)
-
- We'll do more of the above:
-
- >>> dm.inc()
- >>> dm.state, dm.delta
- (0, 2)
- >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
- >>> jar.commit(t1)
- >>> jar.tpc_vote(t1)
- >>> jar.tpc_finish(t1)
- >>> dm.state, dm.delta
- (0, 2)
-
-
- >>> dm.inc()
- >>> dm.state, dm.delta
- (0, 3)
-
- But then we'll decide to abort a subtransaction.
-
- The transaction will just call abort as usual:
-
- >>> jar.abort(t1)
-
- This will cause a rollback to the last savepoint:
- >>> dm.state, dm.delta
- (0, 2)
-
- Then we do more work:
-
- >>> dm.inc()
- >>> dm.state, dm.delta
- (0, 3)
- >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
- >>> jar.commit(t1)
- >>> jar.tpc_vote(t1)
- >>> jar.tpc_finish(t1)
- >>> dm.state, dm.delta
- (0, 3)
-
- Note that the bove works *because* the same transaction is used
- for each subtransaction.
-
- Finally, we'll do a little more work:
-
- >>> dm.inc()
- >>> dm.inc()
- >>> dm.state, dm.delta
- (0, 5)
-
- and then commit the top-level transaction.
-
- The transaction will actually go through the steps for a subtransaction:
-
- >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
- >>> jar.commit(t1)
- >>> jar.tpc_vote(t1)
- >>> jar.tpc_finish(t1)
-
- And then call commit_sub:
-
- >>> jar.commit_sub(t1)
-
- As usual, this doesn't actually do anything. ;)
-
- >>> dm.state, dm.delta
- (0, 5)
-
- The transaction manager doesn's call tpc_begin, because commit_sub
- implies the start of two-phase commit. Next, it does call commit:
-
- >>> jar.commit(t1)
-
- which doesn't do anything.
-
- Finally, the transaction calls tpc_vote:
-
- >>> jar.tpc_vote(t1)
-
- which actually does something (because this is the top-level txn):
-
- >>> dm.state, dm.delta
- (5, 5)
- >>> dm.prepared
- True
-
- Now, at the last minute, the transaction is aborted (possibly due
- to a "no vote" from another data manager):
-
- >>> jar.tpc_abort(t1)
-
- An the changes are undone:
-
- >>> dm.state, dm.delta
- (0, 0)
- >>> dm.prepared
- False
- """
-
def test_suite():
return DocTestSuite()
Modified: ZODB/trunk/src/persistent/interfaces.py
===================================================================
--- ZODB/trunk/src/persistent/interfaces.py 2005-04-24 04:26:04 UTC (rev 30135)
+++ ZODB/trunk/src/persistent/interfaces.py 2005-04-24 05:13:15 UTC (rev 30136)
@@ -288,10 +288,11 @@
policy of one transaction manager for each thread.
"""
- def mtime(object):
- """Return the modification time of the object.
+# Maybe later:
+## def mtime(object):
+## """Return the modification time of the object.
- The modification time may not be known, in which case None
- is returned. If non-None, the return value is the kind of
- timestamp supplied by Python's time.time().
- """
+## The modification time may not be known, in which case None
+## is returned. If non-None, the return value is the kind of
+## timestamp supplied by Python's time.time().
+## """
Modified: ZODB/trunk/src/transaction/__init__.py
===================================================================
--- ZODB/trunk/src/transaction/__init__.py 2005-04-24 04:26:04 UTC (rev 30135)
+++ ZODB/trunk/src/transaction/__init__.py 2005-04-24 05:13:15 UTC (rev 30136)
@@ -24,6 +24,7 @@
begin = manager.begin
commit = manager.commit
abort = manager.abort
+savepoint = manager.savepoint
def get_transaction():
from ZODB.utils import deprecated36
Modified: ZODB/trunk/src/transaction/_manager.py
===================================================================
--- ZODB/trunk/src/transaction/_manager.py 2005-04-24 04:26:04 UTC (rev 30135)
+++ ZODB/trunk/src/transaction/_manager.py 2005-04-24 05:13:15 UTC (rev 30136)
@@ -67,6 +67,9 @@
def abort(self, sub=False):
self.get().abort(sub)
+ def savepoint(self, optimistic=False):
+ return self.get().savepoint(optimistic)
+
class ThreadTransactionManager(TransactionManager):
"""Thread-aware transaction manager.
Modified: ZODB/trunk/src/transaction/_transaction.py
===================================================================
--- ZODB/trunk/src/transaction/_transaction.py 2005-04-24 04:26:04 UTC (rev 30135)
+++ ZODB/trunk/src/transaction/_transaction.py 2005-04-24 05:13:15 UTC (rev 30136)
@@ -30,6 +30,8 @@
Subtransactions
---------------
+Note: Suntransactions are deprecated!
+
A subtransaction applies the transaction notion recursively. It
allows a set of modifications within a transaction to be committed or
aborted as a group. A subtransaction is a strictly local activity;
@@ -82,7 +84,7 @@
tpc_begin() on each resource manager before calling commit() on any of
them.
- 1. tpc_begin(txn, subtransaction=False)
+ 1. tpc_begin(txn)
2. commit(txn)
3. tpc_vote(txn)
4. tpc_finish(txn)
@@ -90,6 +92,8 @@
Subtransaction commit
---------------------
+Note: Subtransactions are deprecated!
+
When a subtransaction commits, the protocol is different.
1. tpc_begin() is passed a second argument, which indicates that a
@@ -128,8 +132,7 @@
call abort().
Once uncommitted objects are aborted, tpc_abort() or abort_sub() is
-called on each resource manager. abort_sub() is called if the
-resource manager was involved in a subtransaction.
+called on each resource manager.
Synchronization
---------------
@@ -213,14 +216,6 @@
self.log = logging.getLogger("txn.%d" % thread.get_ident())
self.log.debug("new transaction")
- # _sub contains all of the resource managers involved in
- # subtransactions. It maps id(a resource manager) to the resource
- # manager.
- self._sub = {}
- # _nonsub contains all the resource managers that do not support
- # subtransactions that were involved in subtransaction commits.
- self._nonsub = {}
-
# If a commit fails, the traceback is saved in _failure_traceback.
# If another attempt is made to commit, TransactionFailedError is
# raised, incorporating this traceback.
@@ -231,6 +226,9 @@
# inefficient for FIFO access of this kind.
self._before_commit = []
+ # Keep track of the last savepoint
+ self._last_savepoint = None
+
# Raise TransactionFailedError, due to commit()/join()/register()
# getting called when the current transaction has already suffered
# a commit failure.
@@ -258,6 +256,34 @@
resource = DataManagerAdapter(resource)
self._resources.append(resource)
+ if self._last_savepoint is not None:
+ self._last_savepoint.join(resource)
+
+ def savepoint(self, optimistic=False):
+ if self.status is Status.COMMITFAILED:
+ self._prior_commit_failed() # doesn't return, it raises
+
+ try:
+ savepoint = Savepoint(optimistic)
+ for resource in self._resources:
+ savepoint.join(resource)
+ except:
+ self._cleanup(self._resources)
+ self._saveCommitishError() # doesn't return, it raises!
+
+ if self._last_savepoint is not None:
+ savepoint.previous = self._last_savepoint
+ self._last_savepoint.next = savepoint
+ self._last_savepoint = savepoint
+ return savepoint
+
+ def _invalidate_last_savepoint(self):
+ # Invalidate the last savepoint and any previous
+ # savepoints. This is done on a commit or abort.
+ if self._last_savepoint is not None:
+ self._last_savepoint._invalidate_previous()
+ self._last_savepoint = None
+
def register(self, obj):
# The old way of registering transaction participants.
#
@@ -273,10 +299,7 @@
raise ValueError("Register with no manager")
adapter = self._adapters.get(manager)
if adapter is None:
- if myhasattr(manager, "commit_sub"):
- adapter = MultiObjectResourceAdapterSub(manager)
- else:
- adapter = MultiObjectResourceAdapter(manager)
+ adapter = MultiObjectResourceAdapter(manager)
adapter.objects.append(obj)
self._adapters[manager] = adapter
self.join(adapter)
@@ -286,67 +309,59 @@
assert id(obj) not in map(id, adapter.objects)
adapter.objects.append(obj)
- # In the presence of subtransactions, an existing adapter
- # might be in _adapters but not in _resources.
- if adapter not in self._resources:
- self._resources.append(adapter)
-
def begin(self):
from ZODB.utils import deprecated36
deprecated36("Transaction.begin() should no longer be used; use "
"the begin() method of a transaction manager.")
- if (self._resources or
- self._sub or
- self._nonsub or
- self._synchronizers):
+ if (self._resources or self._synchronizers):
self.abort()
# Else aborting wouldn't do anything, except if _manager is non-None,
# in which case it would do nothing besides uselessly free() this
# transaction.
def commit(self, subtransaction=False):
+
+ self._invalidate_last_savepoint()
+
+ if subtransaction:
+ # TODO depricate subtransactions
+ self.savepoint(1)
+ return
+
if self.status is Status.COMMITFAILED:
self._prior_commit_failed() # doesn't return
- if not subtransaction:
- self._callBeforeCommitHooks()
+ self._callBeforeCommitHooks()
- if not subtransaction and self._sub and self._resources:
- # This commit is for a top-level transaction that has
- # previously committed subtransactions. Do one last
- # subtransaction commit to clear out the current objects,
- # then commit all the subjars.
- self.commit(True)
+ self._synchronizers.map(lambda s: s.beforeCompletion(self))
+ self.status = Status.COMMITTING
- if not subtransaction:
- self._synchronizers.map(lambda s: s.beforeCompletion(self))
- self.status = Status.COMMITTING
-
try:
- self._commitResources(subtransaction)
+ self._commitResources()
except:
- self.status = Status.COMMITFAILED
- # Save the traceback for TransactionFailedError.
- ft = self._failure_traceback = StringIO()
- t, v, tb = sys.exc_info()
- # Record how we got into commit().
- traceback.print_stack(sys._getframe(1), None, ft)
- # Append the stack entries from here down to the exception.
- traceback.print_tb(tb, None, ft)
- # Append the exception type and value.
- ft.writelines(traceback.format_exception_only(t, v))
- raise t, v, tb
+ self._saveCommitishError() # This raises!
- if subtransaction:
- self._resources = []
- else:
- self.status = Status.COMMITTED
- if self._manager:
- self._manager.free(self)
- self._synchronizers.map(lambda s: s.afterCompletion(self))
- self.log.debug("commit")
+ self.status = Status.COMMITTED
+ if self._manager:
+ self._manager.free(self)
+ self._synchronizers.map(lambda s: s.afterCompletion(self))
+ self.log.debug("commit")
+ def _saveCommitishError(self):
+ self.status = Status.COMMITFAILED
+ # Save the traceback for TransactionFailedError.
+ ft = self._failure_traceback = StringIO()
+ t, v, tb = sys.exc_info()
+ # Record how we got into commit().
+ traceback.print_stack(sys._getframe(1), None, ft)
+ # Append the stack entries from here down to the exception.
+ traceback.print_tb(tb, None, ft)
+ # Append the exception type and value.
+ ft.writelines(traceback.format_exception_only(t, v))
+ raise t, v, tb
+
+
def beforeCommitHook(self, hook, *args, **kws):
self._before_commit.append((hook, args, kws))
@@ -357,31 +372,20 @@
hook, args, kws = self._before_commit.pop(0)
hook(*args, **kws)
- def _commitResources(self, subtransaction):
+ def _commitResources(self):
# Execute the two-phase commit protocol.
- L = self._getResourceManagers(subtransaction)
+ L = list(self._resources)
+ L.sort(rm_cmp)
try:
for rm in L:
- # If you pass subtransaction=True to tpc_begin(), it
- # will create a temporary storage for the duration of
- # the transaction. To signal that the top-level
- # transaction is committing, you must then call
- # commit_sub().
- if not subtransaction and id(rm) in self._sub:
- del self._sub[id(rm)]
- rm.commit_sub(self)
- else:
- rm.tpc_begin(self, subtransaction)
+ rm.tpc_begin(self)
for rm in L:
rm.commit(self)
self.log.debug("commit %r" % rm)
- if not subtransaction:
- # Not sure why, but it is intentional that you do not
- # call tpc_vote() for subtransaction commits.
- for rm in L:
- rm.tpc_vote(self)
- self._voted[id(rm)] = True
+ for rm in L:
+ rm.tpc_vote(self)
+ self._voted[id(rm)] = True
try:
for rm in L:
@@ -401,8 +405,7 @@
try:
self._cleanup(L)
finally:
- if not subtransaction:
- self._synchronizers.map(lambda s: s.afterCompletion(self))
+ self._synchronizers.map(lambda s: s.afterCompletion(self))
raise t, v, tb
def _cleanup(self, L):
@@ -415,68 +418,30 @@
self.log.error("Error in abort() on manager %s",
rm, exc_info=sys.exc_info())
for rm in L:
- if id(rm) in self._sub:
- try:
- rm.abort_sub(self)
- except Exception:
- self.log.error("Error in abort_sub() on manager %s",
- rm, exc_info=sys.exc_info())
- else:
- try:
- rm.tpc_abort(self)
- except Exception:
- self.log.error("Error in tpc_abort() on manager %s",
- rm, exc_info=sys.exc_info())
+ try:
+ rm.tpc_abort(self)
+ except Exception:
+ self.log.error("Error in tpc_abort() on manager %s",
+ rm, exc_info=sys.exc_info())
- def _getResourceManagers(self, subtransaction):
- L = []
- if subtransaction:
- # If we are in a subtransaction, make sure all resource
- # managers are placed in either _sub or _nonsub. When
- # the top-level transaction commits, we need to merge
- # these back into the resource set.
-
- # If a data manager doesn't support sub-transactions, we
- # don't do anything with it now. (That's somewhat okay,
- # because subtransactions are mostly just an
- # optimization.) Save it until the top-level transaction
- # commits.
-
- for rm in self._resources:
- if myhasattr(rm, "commit_sub"):
- self._sub[id(rm)] = rm
- L.append(rm)
- else:
- self._nonsub[id(rm)] = rm
- else:
- if self._sub or self._nonsub:
- # Merge all of _sub, _nonsub, and _resources.
- d = dict(self._sub)
- d.update(self._nonsub)
- # TODO: I think _sub and _nonsub are disjoint, and that
- # _resources is empty. If so, we can simplify this code.
- assert len(d) == len(self._sub) + len(self._nonsub)
- assert not self._resources
- for rm in self._resources:
- d[id(rm)] = rm
- L = d.values()
- else:
- L = list(self._resources)
-
- L.sort(rm_cmp)
- return L
-
def abort(self, subtransaction=False):
- if not subtransaction:
- self._synchronizers.map(lambda s: s.beforeCompletion(self))
- if subtransaction and self._nonsub:
- from ZODB.POSException import TransactionError
- raise TransactionError("Resource manager does not support "
- "subtransaction abort")
+ if subtransaction:
+ # TODO deprecate subtransactions
+ if not self._last_savepoint:
+ raise interfaces.InvalidSavepointRollbackError
+ if self._last_savepoint.valid:
+ # We're supposed to be able to call abort(1) multiple
+ # times. Sigh.
+ self._last_savepoint.rollback()
+ return
+ self._invalidate_last_savepoint()
+
+ self._synchronizers.map(lambda s: s.beforeCompletion(self))
+
tb = None
- for rm in self._resources + self._nonsub.values():
+ for rm in self._resources:
try:
rm.abort(self)
except:
@@ -485,21 +450,12 @@
self.log.error("Failed to abort resource manager: %s",
rm, exc_info=sys.exc_info())
- if not subtransaction:
- for rm in self._sub.values():
- try:
- rm.abort_sub(self)
- except:
- if tb is None:
- t, v, tb = sys.exc_info()
- self.log.error("Failed to abort_sub resource manager: %s",
- rm, exc_info=sys.exc_info())
+ if self._manager:
+ self._manager.free(self)
+
+ self._synchronizers.map(lambda s: s.afterCompletion(self))
- if not subtransaction:
- if self._manager:
- self._manager.free(self)
- self._synchronizers.map(lambda s: s.afterCompletion(self))
- self.log.debug("abort")
+ self.log.debug("abort")
if tb is not None:
raise t, v, tb
@@ -539,8 +495,8 @@
def sortKey(self):
return self.manager.sortKey()
- def tpc_begin(self, txn, sub=False):
- self.manager.tpc_begin(txn, sub)
+ def tpc_begin(self, txn):
+ self.manager.tpc_begin(txn)
def tpc_finish(self, txn):
self.manager.tpc_finish(txn)
@@ -571,25 +527,6 @@
if tb is not None:
raise t, v, tb
-class MultiObjectResourceAdapterSub(MultiObjectResourceAdapter):
- """Adapt resource managers that participate in subtransactions."""
-
- def commit_sub(self, txn):
- self.manager.commit_sub(txn)
-
- def abort_sub(self, txn):
- self.manager.abort_sub(txn)
-
- def tpc_begin(self, txn, sub=False):
- self.manager.tpc_begin(txn, sub)
- self.sub = sub
-
- def tpc_finish(self, txn):
- self.manager.tpc_finish(txn)
- if self.sub:
- self.objects = []
-
-
def rm_cmp(rm1, rm2):
return cmp(rm1.sortKey(), rm2.sortKey())
@@ -624,50 +561,82 @@
def __init__(self, datamanager):
self._datamanager = datamanager
- self._rollback = None
# TODO: I'm not sure why commit() doesn't do anything
def commit(self, transaction):
+ # We don't do anything here because ZODB4-style data managers
+ # didn't have a separate commit step
pass
def abort(self, transaction):
-
- # We need to discard any changes since the last save point, or all
- # changes
-
- if self._rollback is None:
- # No previous savepoint, so just abort
- self._datamanager.abort(transaction)
- else:
- self._rollback()
-
- def abort_sub(self, transaction):
self._datamanager.abort(transaction)
- def commit_sub(self, transaction):
- # Nothing to do wrt data, be we begin 2pc for the top-level
- # trans
- self._sub = False
-
- def tpc_begin(self, transaction, subtransaction=False):
- self._sub = subtransaction
-
+ def tpc_begin(self, transaction):
+ # We don't do anything here because ZODB4-style data managers
+ # didn't have a separate tpc_begin step
+ pass
+
def tpc_abort(self, transaction):
- if self._sub:
- self.abort(self, transaction)
- else:
- self._datamanager.abort(transaction)
+ self._datamanager.abort(transaction)
def tpc_finish(self, transaction):
- if self._sub:
- self._rollback = self._datamanager.savepoint(transaction).rollback
- else:
- self._datamanager.commit(transaction)
+ self._datamanager.commit(transaction)
def tpc_vote(self, transaction):
- if not self._sub:
- self._datamanager.prepare(transaction)
+ self._datamanager.prepare(transaction)
def sortKey(self):
return self._datamanager.sortKey()
+
+class Savepoint:
+ """Transaction savepoint
+
+ Transaction savepoints coordinate savepoints for data managers
+ participating in a transaction.
+
+ """
+ interface.implements(interfaces.ISavepoint)
+
+ def __init__(self, optimistic):
+ self._savepoints = []
+ self.valid = True
+ self.next = self.previous = None
+ self.optimistic = optimistic
+
+ def join(self, datamanager):
+ try:
+ savepoint = datamanager.savepoint
+ except AttributeError:
+ if not self.optimistic:
+ raise TypeError("Savepoints unsupported", datamanager)
+ savepoint = NoRollbackSavepoint(datamanager)
+ else:
+ savepoint = savepoint()
+
+ self._savepoints.append(savepoint)
+
+ def rollback(self):
+ if not self.valid:
+ raise interfaces.InvalidSavepointRollbackError
+ self._invalidate_next()
+ for savepoint in self._savepoints:
+ savepoint.rollback()
+
+ def _invalidate_next(self):
+ self.valid = False
+ if self.next is not None:
+ self.next._invalidate_next()
+
+ def _invalidate_previous(self):
+ self.valid = False
+ if self.previous is not None:
+ self.previous._invalidate_previous()
+
+class NoRollbackSavepoint:
+
+ def __init__(self, datamanager):
+ self.datamanager = datamanager
+
+ def rollback(self):
+ raise TypeError("Savepoints unsupported", self.datamanager)
Modified: ZODB/trunk/src/transaction/interfaces.py
===================================================================
--- ZODB/trunk/src/transaction/interfaces.py 2005-04-24 04:26:04 UTC (rev 30135)
+++ ZODB/trunk/src/transaction/interfaces.py 2005-04-24 05:13:15 UTC (rev 30136)
@@ -18,158 +18,47 @@
import zope.interface
-class ISynchronizer(zope.interface.Interface):
- """Objects that participate in the transaction-boundary notification API.
- """
+class ITransactionManager(zope.interface.Interface):
+ """An object that manages a sequence of transactions
- def beforeCompletion(transaction):
- """Hook that is called by the transaction at the start of a commit."""
-
- def afterCompletion(transaction):
- """Hook that is called by the transaction after completing a commit."""
-
-
-class IDataManager(zope.interface.Interface):
- """Objects that manage transactional storage.
-
- These objects may manage data for other objects, or they may manage
- non-object storages, such as relational databases.
-
- IDataManagerOriginal is the interface currently provided by ZODB
- database connections, but the intent is to move to the newer
- IDataManager.
+ Applications use transaction managers to establish transaction boundaries.
"""
- def abort_sub(transaction):
- """Discard all subtransaction data.
- See subtransaction.txt
+ def begin():
+ """Begin a new transaction.
- This is called when top-level transactions are aborted.
-
- No further subtransactions can be started once abort_sub()
- has been called; this is only used when the transaction is
- being aborted.
-
- abort_sub also implies the abort of a 2-phase commit.
-
- This should never fail.
+ If an existing transaction is in progress, it will be aborted.
"""
- def commit_sub(transaction):
- """Commit all changes made in subtransactions and begin 2-phase commit
-
- Data are saved *as if* they are part of the current transaction.
- That is, they will not be persistent unless the current transaction
- is committed.
-
- This is called when the current top-level transaction is committed.
-
- No further subtransactions can be started once commit_sub()
- has been called; this is only used when the transaction is
- being committed.
-
- This call also implies the beginning of 2-phase commit.
+ def get():
+ """Get the current transaction.
"""
- # Two-phase commit protocol. These methods are called by the
- # ITransaction object associated with the transaction being
- # committed.
-
- def tpc_begin(transaction, subtransaction=False):
- """Begin commit of a transaction, starting the two-phase commit.
-
- transaction is the ITransaction instance associated with the
- transaction being committed.
-
- subtransaction is a Boolean flag indicating whether the
- two-phase commit is being invoked for a subtransaction.
-
- Important note: Subtransactions are modelled in the sense that
- when you commit a subtransaction, subsequent commits should be
- for subtransactions as well. That is, there must be a
- commit_sub() call between a tpc_begin() call with the
- subtransaction flag set to true and a tpc_begin() with the
- flag set to false.
-
+ def commit():
+ """Commit the current transaction
"""
- def tpc_abort(transaction):
- """Abort a transaction.
-
- This is called by a transaction manager to end a two-phase commit on
- the data manager.
-
- This is always called after a tpc_begin call.
-
- transaction is the ITransaction instance associated with the
- transaction being committed.
-
- This should never fail.
+ def abort(self):
+ """Abort the current transaction
"""
- def tpc_finish(transaction):
- """Indicate confirmation that the transaction is done.
+ def registerSynch(synch):
+ """Register an ISynchronizer.
- transaction is the ITransaction instance associated with the
- transaction being committed.
-
- This should never fail. If this raises an exception, the
- database is not expected to maintain consistency; it's a
- serious error.
-
- It's important that the storage calls the passed function
- while it still has its lock. We don't want another thread
- to be able to read any updated data until we've had a chance
- to send an invalidation message to all of the other
- connections!
+ Synchronizers are notified at the beginning and end of
+ transaction completion.
+
"""
- def tpc_vote(transaction):
- """Verify that a data manager can commit the transaction
+ def unregisterSynch(synch):
+ """Unregister an ISynchronizer.
- This is the last chance for a data manager to vote 'no'. A
- data manager votes 'no' by raising an exception.
-
- transaction is the ITransaction instance associated with the
- transaction being committed.
+ Synchronizers are notified at the beginning and end of
+ transaction completion.
+
"""
- def commit(transaction):
- """Commit modifications to registered objects.
-
- Save the object as part of the data to be made persistent if
- the transaction commits.
-
- This includes conflict detection and handling. If no conflicts or
- errors occur it saves the objects in the storage.
- """
-
- def abort(transaction):
- """Abort a transaction and forget all changes.
-
- Abort must be called outside of a two-phase commit.
-
- Abort is called by the transaction manager to abort transactions
- that are not yet in a two-phase commit.
- """
-
- def sortKey():
- """Return a key to use for ordering registered DataManagers
-
- ZODB uses a global sort order to prevent deadlock when it commits
- transactions involving multiple resource managers. The resource
- manager must define a sortKey() method that provides a global ordering
- for resource managers.
- """
- # Alternate version:
- #"""Return a consistent sort key for this connection.
- #
- #This allows ordering multiple connections that use the same storage in
- #a consistent manner. This is unique for the lifetime of a connection,
- #which is good enough to avoid ZEO deadlocks.
- #"""
-
class ITransaction(zope.interface.Interface):
"""Object representing a running transaction.
@@ -219,9 +108,17 @@
def join(datamanager):
"""Add a datamanager to the transaction.
+ The if the data manager supports savepoints, it must call this
+ *before* making any changes. If the transaction has had any
+ savepoints, then it will take a savepoint of the data manager
+ when join is called and this savepoint must reflct the state
+ of the data manager before any changes that caused the data
+ manager to join the transaction.
+
The datamanager must implement the
transactions.interfaces.IDataManager interface, and be
adaptable to ZODB.interfaces.IDataManager.
+
"""
def note(text):
@@ -293,3 +190,161 @@
# TODO: deprecate this for 3.6.
def register(object):
"""Register the given object for transaction control."""
+
+class IDataManager(zope.interface.Interface):
+ """Objects that manage transactional storage.
+
+ These objects may manage data for other objects, or they may manage
+ non-object storages, such as relational databases.
+
+ IDataManagerOriginal is the interface currently provided by ZODB
+ database connections, but the intent is to move to the newer
+ IDataManager.
+
+ Note that when data are modified, data managers should join a
+ transaction so that data can be committed when the user commits
+ the transaction.
+
+ """
+
+ # Two-phase commit protocol. These methods are called by the
+ # ITransaction object associated with the transaction being
+ # committed.
+
+ def abort(transaction):
+ """Abort a transaction and forget all changes.
+
+ Abort must be called outside of a two-phase commit.
+
+ Abort is called by the transaction manager to abort transactions
+ that are not yet in a two-phase commit.
+ """
+
+ def tpc_begin(transaction):
+ """Begin commit of a transaction, starting the two-phase commit.
+
+ transaction is the ITransaction instance associated with the
+ transaction being committed.
+
+ subtransaction is a Boolean flag indicating whether the
+ two-phase commit is being invoked for a subtransaction.
+
+ Important note: Subtransactions are modelled in the sense that
+ when you commit a subtransaction, subsequent commits should be
+ for subtransactions as well. That is, there must be a
+ commit_sub() call between a tpc_begin() call with the
+ subtransaction flag set to true and a tpc_begin() with the
+ flag set to false.
+
+ """
+
+ def commit(transaction):
+ """Commit modifications to registered objects.
+
+ Save the object as part of the data to be made persistent if
+ the transaction commits.
+
+ This includes conflict detection and handling. If no conflicts or
+ errors occur it saves the objects in the storage.
+ """
+
+ def tpc_abort(transaction):
+ """Abort a transaction.
+
+ This is called by a transaction manager to end a two-phase commit on
+ the data manager.
+
+ This is always called after a tpc_begin call.
+
+ transaction is the ITransaction instance associated with the
+ transaction being committed.
+
+ This should never fail.
+ """
+
+ def tpc_vote(transaction):
+ """Verify that a data manager can commit the transaction
+
+ This is the last chance for a data manager to vote 'no'. A
+ data manager votes 'no' by raising an exception.
+
+ transaction is the ITransaction instance associated with the
+ transaction being committed.
+ """
+
+ def tpc_finish(transaction):
+ """Indicate confirmation that the transaction is done.
+
+ transaction is the ITransaction instance associated with the
+ transaction being committed.
+
+ This should never fail. If this raises an exception, the
+ database is not expected to maintain consistency; it's a
+ serious error.
+
+ It's important that the storage calls the passed function
+ while it still has its lock. We don't want another thread
+ to be able to read any updated data until we've had a chance
+ to send an invalidation message to all of the other
+ connections!
+ """
+
+ def sortKey():
+ """Return a key to use for ordering registered DataManagers
+
+ ZODB uses a global sort order to prevent deadlock when it commits
+ transactions involving multiple resource managers. The resource
+ manager must define a sortKey() method that provides a global ordering
+ for resource managers.
+ """
+ # Alternate version:
+ #"""Return a consistent sort key for this connection.
+ #
+ #This allows ordering multiple connections that use the same storage in
+ #a consistent manner. This is unique for the lifetime of a connection,
+ #which is good enough to avoid ZEO deadlocks.
+ #"""
+
+class ISavepointDataManager(IDataManager):
+
+ def savepoint():
+ """Return a savepoint (ISavepoint)
+ """
+
+class ISavepoint(zope.interface.Interface):
+ """A transaction savepoint
+ """
+
+ def rollback():
+ """Rollback any work done since the savepoint
+
+ An InvalidSavepointRollbackError is raised if the savepoint
+ isn't valid.
+
+ """
+
+ valid = zope.interface.Attribute(
+ "Boolean indicating whether the savepoint is valid")
+
+class InvalidSavepointRollbackError(Exception):
+ """Attempt to rollback an invalid savepoint
+
+ A savepoint may be invalid because:
+
+ - The surrounding transaction has committed or aborted
+
+ - An earlier savepoint in the same transaction has been rolled back
+ """
+
+class ISynchronizer(zope.interface.Interface):
+ """Objects that participate in the transaction-boundary notification API.
+ """
+
+ def beforeCompletion(transaction):
+ """Hook that is called by the transaction at the start of a commit.
+ """
+
+ def afterCompletion(transaction):
+ """Hook that is called by the transaction after completing a commit.
+ """
+
Copied: ZODB/trunk/src/transaction/savepoint.txt (from rev 30131, ZODB/branches/3.4/src/transaction/savepoint.txt)
Property changes on: ZODB/trunk/src/transaction/savepoint.txt
___________________________________________________________________
Name: svn:eol-style
+ native
Copied: ZODB/trunk/src/transaction/tests/savepointsample.py (from rev 30131, ZODB/branches/3.4/src/transaction/tests/savepointsample.py)
Property changes on: ZODB/trunk/src/transaction/tests/savepointsample.py
___________________________________________________________________
Name: svn:keywords
+ Id
Name: svn:eol-style
+ native
Modified: ZODB/trunk/src/transaction/tests/test_register_compat.py
===================================================================
--- ZODB/trunk/src/transaction/tests/test_register_compat.py 2005-04-24 04:26:04 UTC (rev 30135)
+++ ZODB/trunk/src/transaction/tests/test_register_compat.py 2005-04-24 05:13:15 UTC (rev 30136)
@@ -128,7 +128,7 @@
def sortKey(self):
return str(id(self))
- def tpc_begin(self, txn, sub):
+ def tpc_begin(self, txn):
self.calls.append("begin")
def tpc_vote(self, txn):
Copied: ZODB/trunk/src/transaction/tests/test_savepoint.py (from rev 30131, ZODB/branches/3.4/src/transaction/tests/test_savepoint.py)
Property changes on: ZODB/trunk/src/transaction/tests/test_savepoint.py
___________________________________________________________________
Name: svn:keywords
+ Id
Name: svn:eol-style
+ native
Modified: ZODB/trunk/src/transaction/tests/test_transaction.py
===================================================================
--- ZODB/trunk/src/transaction/tests/test_transaction.py 2005-04-24 04:26:04 UTC (rev 30135)
+++ ZODB/trunk/src/transaction/tests/test_transaction.py 2005-04-24 05:13:15 UTC (rev 30136)
@@ -85,101 +85,7 @@
t.abort()
- def testSubTransactionCommitCommit(self):
- self.sub1.modify()
- self.sub2.modify()
-
- self.txn_mgr.commit(1)
-
- assert self.sub1._p_jar.ctpc_vote == 0
- assert self.sub1._p_jar.ctpc_finish == 1
-
- self.txn_mgr.commit()
-
- assert self.sub1._p_jar.ccommit_sub == 1
- assert self.sub1._p_jar.ctpc_vote == 1
-
- def testSubTransactionCommitAbort(self):
-
- self.sub1.modify()
- self.sub2.modify()
-
- self.txn_mgr.commit(1)
- self.txn_mgr.abort()
-
- assert self.sub1._p_jar.ctpc_vote == 0
- assert self.sub1._p_jar.cabort == 0
- assert self.sub1._p_jar.cabort_sub == 1
-
- def testMultipleSubTransactionCommitCommit(self):
- self.sub1.modify()
- self.txn_mgr.commit(1)
-
- self.sub2.modify()
- # reset a flag on the original to test it again
- self.sub1.ctpc_finish = 0
- self.txn_mgr.commit(1)
-
- # this is interesting.. we go through
- # every subtrans commit with all subtrans capable
- # objects... i don't like this but its an impl artifact
-
- assert self.sub1._p_jar.ctpc_vote == 0
- assert self.sub1._p_jar.ctpc_finish > 0
-
- # add another before we do the entire txn commit
- self.sub3.modify()
-
- self.txn_mgr.commit()
-
- # we did an implicit sub commit, is this impl artifact?
- assert self.sub3._p_jar.ccommit_sub == 1
- assert self.sub1._p_jar.ctpc_finish > 1
-
-
- def testMultipleSubTransactionCommitAbortSub(self):
- """
- sub1 calling method commit
- sub1 calling method tpc_finish
- sub2 calling method tpc_begin
- sub2 calling method commit
- sub2 calling method tpc_finish
- sub3 calling method abort
- sub1 calling method commit_sub
- sub2 calling method commit_sub
- sub2 calling method tpc_vote
- sub1 calling method tpc_vote
- sub1 calling method tpc_finish
- sub2 calling method tpc_finish
- """
-
- # add it
- self.sub1.modify()
-
- self.txn_mgr.commit(1)
-
- # add another
- self.sub2.modify()
-
- self.txn_mgr.commit(1)
-
- assert self.sub1._p_jar.ctpc_vote == 0
- assert self.sub1._p_jar.ctpc_finish > 0
-
- # add another before we do the entire txn commit
- self.sub3.modify()
-
- # abort the sub transaction
- self.txn_mgr.abort(1)
-
- # commit the container transaction
- self.txn_mgr.commit()
-
- assert self.sub3._p_jar.cabort == 1
- assert self.sub1._p_jar.ccommit_sub == 1
- assert self.sub1._p_jar.ctpc_finish > 1
-
# repeat adding in a nonsub trans jars
def testNSJTransactionCommit(self):
@@ -230,69 +136,7 @@
assert self.nosub1._p_jar.cabort == 1
assert self.sub1._p_jar.cabort_sub == 1
- def testNSJSubTransactionCommitCommit(self):
- self.sub1.modify()
- self.nosub1.modify()
-
- self.txn_mgr.commit(1)
-
- assert self.nosub1._p_jar.ctpc_vote == 0
-
- self.txn_mgr.commit()
-
- #assert self.nosub1._p_jar.ccommit_sub == 0
- assert self.nosub1._p_jar.ctpc_vote == 1
- assert self.sub1._p_jar.ccommit_sub == 1
- assert self.sub1._p_jar.ctpc_vote == 1
-
-
- def testNSJMultipleSubTransactionCommitCommit(self):
- """
- sub1 calling method tpc_begin
- sub1 calling method commit
- sub1 calling method tpc_finish
- nosub calling method tpc_begin
- nosub calling method tpc_finish
- sub2 calling method tpc_begin
- sub2 calling method commit
- sub2 calling method tpc_finish
- nosub calling method tpc_begin
- nosub calling method commit
- sub1 calling method commit_sub
- sub2 calling method commit_sub
- sub1 calling method tpc_vote
- nosub calling method tpc_vote
- sub2 calling method tpc_vote
- sub2 calling method tpc_finish
- nosub calling method tpc_finish
- sub1 calling method tpc_finish
- """
-
- # add it
- self.sub1.modify()
-
- self.txn_mgr.commit(1)
-
- # add another
- self.nosub1.modify()
-
- self.txn_mgr.commit(1)
-
- assert self.sub1._p_jar.ctpc_vote == 0
- assert self.nosub1._p_jar.ctpc_vote == 0
- assert self.sub1._p_jar.ctpc_finish > 0
-
- # add another before we do the entire txn commit
- self.sub2.modify()
-
- # commit the container transaction
- self.txn_mgr.commit()
-
- # we did an implicit sub commit
- assert self.sub2._p_jar.ccommit_sub == 1
- assert self.sub1._p_jar.ctpc_finish > 1
-
### Failure Mode Tests
#
# ok now we do some more interesting
@@ -387,81 +231,7 @@
assert self.nosub1._p_jar.ctpc_abort == 1
- ### More Failure modes...
- # now we mix in some sub transactions
- ###
- def testExceptionInSubCommitSub(self):
- # It's harder than normal to verify test results, because
- # the subtransaction jars are stored in a dictionary. The
- # order in which jars are processed depends on the order
- # they come out of the dictionary.
-
- self.sub1.modify()
- self.txn_mgr.commit(1)
-
- self.nosub1.modify()
-
- self.sub2._p_jar = SubTransactionJar(errors='commit_sub')
- self.sub2.modify(nojar=1)
-
- self.txn_mgr.commit(1)
-
- self.sub3.modify()
-
- try:
- self.txn_mgr.commit()
- except TestTxnException:
- pass
-
- if self.sub1._p_jar.ccommit_sub:
- self.assertEqual(self.sub1._p_jar.ctpc_abort, 1)
- else:
- self.assertEqual(self.sub1._p_jar.cabort_sub, 1)
-
- self.assertEqual(self.sub2._p_jar.ctpc_abort, 1)
- self.assertEqual(self.nosub1._p_jar.ctpc_abort, 1)
-
- if self.sub3._p_jar.ccommit_sub:
- self.assertEqual(self.sub3._p_jar.ctpc_abort, 1)
- else:
- self.assertEqual(self.sub3._p_jar.cabort_sub, 1)
-
- def testExceptionInSubAbortSub(self):
- # This test has two errors. When commit_sub() is called on
- # sub1, it will fail. If sub1 is handled first, it will raise
- # an except and abort_sub() will be called on sub2. If sub2
- # is handled first, then commit_sub() will fail after sub2 has
- # already begun its top-level transaction and tpc_abort() will
- # be called.
-
- self.sub1._p_jar = SubTransactionJar(errors='commit_sub')
- self.sub1.modify(nojar=1)
- self.txn_mgr.commit(1)
-
- self.nosub1.modify()
- self.sub2._p_jar = SubTransactionJar(errors='abort_sub')
- self.sub2.modify(nojar=1)
- self.txn_mgr.commit(1)
-
- self.sub3.modify()
-
- try:
- self.txn_mgr.commit()
- except TestTxnException, err:
- pass
- else:
- self.fail("expected transaction to fail")
-
- # The last commit failed. If the commit_sub() method was
- # called, then tpc_abort() should be called to abort the
- # actual transaction. If not, then calling abort_sub() is
- # sufficient.
- if self.sub3._p_jar.ccommit_sub:
- self.assertEqual(self.sub3._p_jar.ctpc_abort, 1)
- else:
- self.assertEqual(self.sub3._p_jar.cabort_sub, 1)
-
# last test, check the hosing mechanism
## def testHoserStoppage(self):
@@ -507,7 +277,7 @@
self._p_jar = NoSubTransactionJar(tracing=tracing)
else:
self._p_jar = SubTransactionJar(tracing=tracing)
- self.txn_mgr.get().register(self)
+ self.txn_mgr.get().join(self._p_jar)
class TestTxnException(Exception):
pass
More information about the Zodb-checkins
mailing list