[Zodb-checkins] SVN: ZODB/branches/3.4/src/ Added savepoints!

Jim Fulton jim at zope.com
Sat Apr 23 21:29:02 EDT 2005

Log message for revision 30129:
  Added savepoints!
  (And also added interfaces and rearranged some code to hopefully make
  it easier to read.)

  U   ZODB/branches/3.4/src/ZODB/Connection.py
  D   ZODB/branches/3.4/src/ZODB/TmpStore.py
  U   ZODB/branches/3.4/src/ZODB/tests/testConnection.py
  A   ZODB/branches/3.4/src/ZODB/tests/testConnectionSavepoint.py
  A   ZODB/branches/3.4/src/ZODB/tests/testConnectionSavepoint.txt
  U   ZODB/branches/3.4/src/ZODB/tests/testZODB.py
  U   ZODB/branches/3.4/src/ZODB/tests/test_datamanageradapter.py
  U   ZODB/branches/3.4/src/transaction/__init__.py
  U   ZODB/branches/3.4/src/transaction/_manager.py
  U   ZODB/branches/3.4/src/transaction/_transaction.py
  U   ZODB/branches/3.4/src/transaction/interfaces.py
  A   ZODB/branches/3.4/src/transaction/savepoint.txt
  A   ZODB/branches/3.4/src/transaction/tests/savepointsample.py
  U   ZODB/branches/3.4/src/transaction/tests/test_register_compat.py
  A   ZODB/branches/3.4/src/transaction/tests/test_savepoint.py

Modified: ZODB/branches/3.4/src/ZODB/Connection.py
--- ZODB/branches/3.4/src/ZODB/Connection.py	2005-04-24 01:28:59 UTC (rev 30128)
+++ ZODB/branches/3.4/src/ZODB/Connection.py	2005-04-24 01:29:02 UTC (rev 30129)
@@ -17,6 +17,7 @@
 import logging
 import sys
+import tempfile
 import threading
 import warnings
 from time import time
@@ -33,13 +34,12 @@
 from ZODB.ConflictResolution import ResolvedSerial
 from ZODB.ExportImport import ExportImport
-from ZODB.POSException \
-     import ConflictError, ReadConflictError, InvalidObjectReference, \
-            ConnectionStateError
-from ZODB.TmpStore import TmpStore
+from ZODB import POSException
+from ZODB.POSException import InvalidObjectReference, ConnectionStateError
+from ZODB.POSException import ConflictError, ReadConflictError
 from ZODB.serialize import ObjectWriter, ObjectReader, myhasattr
-from ZODB.utils import u64, oid_repr, z64, positive_id, \
-        DEPRECATED_ARGUMENT, deprecated36
+from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36
+from ZODB.utils import p64, u64, z64, oid_repr, positive_id
 global_reset_counter = 0
@@ -61,17 +61,19 @@
     implements(IConnection, IDataManager, IPersistentDataManager)
+    _storage = _normal_storage = _savepoint_storage = None
     _tmp = None
     _code_timestamp = 0
-    # ZODB.IConnection
+    ##########################################################################
+    # Connection methods, ZODB.IConnection
     def __init__(self, version='', cache_size=400,
                  cache_deactivate_after=None, mvcc=True, txn_mgr=None,
         """Create a new Connection."""
         self._log = logging.getLogger("ZODB.Connection")
-        self._storage = None
         self._debug_info = ()
         self._opened = None # time.time() when DB.open() opened us
@@ -150,39 +152,6 @@
         self.connections = None
-    def get_connection(self, database_name):
-        """Return a Connection for the named database."""
-        connection = self.connections.get(database_name)
-        if connection is None:
-            new_con = self._db.databases[database_name].open()
-            self.connections.update(new_con.connections)
-            new_con.connections = self.connections
-            connection = new_con
-        return connection
-    def get(self, oid):
-        """Return the persistent object with oid 'oid'."""
-        if self._storage is None:
-            raise ConnectionStateError("The database connection is closed")
-        obj = self._cache.get(oid, None)
-        if obj is not None:
-            return obj
-        obj = self._added.get(oid, None)
-        if obj is not None:
-            return obj
-        p, serial = self._storage.load(oid, self._version)
-        obj = self._reader.getGhost(p)
-        obj._p_oid = oid
-        obj._p_jar = self
-        obj._p_changed = None
-        obj._p_serial = serial
-        self._cache[oid] = obj
-        return obj
     def add(self, obj):
         """Add a new object 'obj' to the database and assign it an oid."""
         if self._storage is None:
@@ -207,52 +176,41 @@
         elif obj._p_jar is not self:
             raise InvalidObjectReference(obj, obj._p_jar)
-    def sortKey(self):
-        """Return a consistent sort key for this connection."""
-        return "%s:%s" % (self._storage.sortKey(), id(self))
+    def get(self, oid):
+        """Return the persistent object with oid 'oid'."""
+        if self._storage is None:
+            raise ConnectionStateError("The database connection is closed")
-    def abort(self, transaction):
-        """Abort a transaction and forget all changes."""
-        for obj in self._registered_objects:
-            oid = obj._p_oid
-            assert oid is not None
-            if oid in self._added:
-                del self._added[oid]
-                del obj._p_jar
-                del obj._p_oid
-            else:
+        obj = self._cache.get(oid, None)
+        if obj is not None:
+            return obj
+        obj = self._added.get(oid, None)
+        if obj is not None:
+            return obj
-                # Note: If we invalidate a non-ghostifiable object
-                # (i.e. a persistent class), the object will
-                # immediately reread it's state.  That means that the
-                # following call could result in a call to
-                # self.setstate, which, of course, must suceed.
-                # In general, it would be better if the read could be
-                # delayed until the start of the next transaction.  If
-                # we read at the end of a transaction and if the
-                # object was invalidated during this transaction, then
-                # we'll read non-current data, which we'll discard
-                # later in transaction finalization.  Unfortnately, we
-                # can only delay the read if this abort corresponds to
-                # a top-level-transaction abort.  We can't tell if
-                # this is a top-level-transaction abort, so we have to
-                # go ahead and invalidate now. Fortunately, it's
-                # pretty unlikely that the object we are invalidating
-                # was invalidated by another thread, so the risk of a
-                # reread is pretty low.
+        p, serial = self._storage.load(oid, self._version)
+        obj = self._reader.getGhost(p)
-                self._cache.invalidate(oid)
+        obj._p_oid = oid
+        obj._p_jar = self
+        obj._p_changed = None
+        obj._p_serial = serial
-        self._tpc_cleanup()
+        self._cache[oid] = obj
+        return obj
+    def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
+        """Deactivate all unmodified objects in the cache."""
+        if dt is not DEPRECATED_ARGUMENT:
+            deprecated36("cacheMinimize() dt= is ignored.")
+        self._cache.minimize()
     # TODO: we should test what happens when cacheGC is called mid-transaction.
     def cacheGC(self):
         """Reduce cache size to target size."""
     __onCloseCallbacks = None
     def onCloseCallback(self, f):
         """Register a callable, f, to be called by close()."""
         if self.__onCloseCallbacks is None:
@@ -266,12 +224,6 @@
             raise ConnectionStateError("Cannot close a connection joined to "
                                        "a transaction")
-        if self._tmp is not None:
-            # There are no direct modifications pending, but a subtransaction
-            # is pending.
-            raise ConnectionStateError("Cannot close a connection with a "
-                                       "pending subtransaction")
         if self._cache is not None:
             self._cache.incrgc() # This is a good time to do some GC
@@ -285,7 +237,8 @@
                     self._log.error("Close callback failed for %s", f,
             self.__onCloseCallbacks = None
-        self._storage = self._tmp = self.new_oid = None
+        self._storage = self._savepoint_storage = self._normal_storage = None
+        self.new_oid = None
         self._debug_info = ()
         self._opened = None
         # Return the connection to the pool.
@@ -297,10 +250,192 @@
             # assert that here, because self may have been reused (by
             # another thread) by the time we get back here.
-    # transaction.interfaces.IDataManager
+    def db(self):
+        """Returns a handle to the database this connection belongs to."""
+        return self._db
+    def isReadOnly(self):
+        """Returns True if the storage for this connection is read only."""
+        if self._storage is None:
+            raise ConnectionStateError("The database connection is closed")
+        return self._storage.isReadOnly()
+    def invalidate(self, tid, oids):
+        """Notify the Connection that transaction 'tid' invalidated oids."""
+        self._inv_lock.acquire()
+        try:
+            if self._txn_time is None:
+                self._txn_time = tid
+            self._invalidated.update(oids)
+        finally:
+            self._inv_lock.release()
+    def root(self):
+        """Return the database root object."""
+        return self.get(z64)
+    def getVersion(self):
+        """Returns the version this connection is attached to."""
+        if self._storage is None:
+            raise ConnectionStateError("The database connection is closed")
+        return self._version
+    def get_connection(self, database_name):
+        """Return a Connection for the named database."""
+        connection = self.connections.get(database_name)
+        if connection is None:
+            new_con = self._db.databases[database_name].open()
+            self.connections.update(new_con.connections)
+            new_con.connections = self.connections
+            connection = new_con
+        return connection
+    def sync(self):
+        """Manually update the view on the database."""
+        self._txn_mgr.get().abort()
+        sync = getattr(self._storage, 'sync', 0)
+        if sync:
+            sync()
+        self._flush_invalidations()
+    def getDebugInfo(self):
+        """Returns a tuple with different items for debugging the
+        connection.
+        """
+        return self._debug_info
+    def setDebugInfo(self, *args):
+        """Add the given items to the debug information of this connection."""
+        self._debug_info = self._debug_info + args
+    def getTransferCounts(self, clear=False):
+        """Returns the number of objects loaded and stored."""
+        res = self._load_count, self._store_count
+        if clear:
+            self._load_count = 0
+            self._store_count = 0
+        return res
+    # Connection methods
+    ##########################################################################
+    ##########################################################################
+    # Data manager (IDataManager) methods
+    def abort(self, transaction):
+        """Abort a transaction and forget all changes."""
+        if self._savepoint_storage is not None:
+            self._abort_savepoint()
+        self._abort()
+        self._tpc_cleanup()
+    def _abort(self):
+        """Abort a transaction and forget all changes."""
+        for obj in self._registered_objects:
+            oid = obj._p_oid
+            assert oid is not None
+            if oid in self._added:
+                del self._added[oid]
+                del obj._p_jar
+                del obj._p_oid
+            else:
+                # Note: If we invalidate a non-ghostifiable object
+                # (i.e. a persistent class), the object will
+                # immediately reread it's state.  That means that the
+                # following call could result in a call to
+                # self.setstate, which, of course, must suceed.
+                # In general, it would be better if the read could be
+                # delayed until the start of the next transaction.  If
+                # we read at the end of a transaction and if the
+                # object was invalidated during this transaction, then
+                # we'll read non-current data, which we'll discard
+                # later in transaction finalization.  Unfortnately, we
+                # can only delay the read if this abort corresponds to
+                # a top-level-transaction abort.  We can't tell if
+                # this is a top-level-transaction abort, so we have to
+                # go ahead and invalidate now. Fortunately, it's
+                # pretty unlikely that the object we are invalidating
+                # was invalidated by another thread, so the risk of a
+                # reread is pretty low.
+                self._cache.invalidate(oid)
+    def _tpc_cleanup(self):
+        """Performs cleanup operations to support tpc_finish and tpc_abort."""
+        self._conflicts.clear()
+        if not self._synch:
+            self._flush_invalidations()
+        self._needs_to_join = True
+        self._registered_objects = []
+    def _flush_invalidations(self):
+        self._inv_lock.acquire()
+        try:
+            # Non-ghostifiable objects may need to read when they are
+            # invalidated, so, we'll quickly just replace the
+            # invalidating dict with a new one.  We'll then process
+            # the invalidations after freeing the lock *and* after
+            # reseting the time.  This means that invalidations will
+            # happen after the start of the transactions.  They are
+            # subject to conflict errors and to reading old data,
+            # TODO: There is a potential problem lurking for persistent
+            # classes.  Suppose we have an invlidation of a persistent
+            # class and of an instance.  If the instance is
+            # invalidated first and if the invalidation logic uses
+            # data read from the class, then the invalidation could
+            # be performed with state data.  Or, suppose that there
+            # are instances of the class that are freed as a result of
+            # invalidating some object.  Perhaps code in their __del__
+            # uses class data.  Really, the only way to properly fix
+            # this is to, in fact, make classes ghostifiable.  Then
+            # we'd have to reimplement attribute lookup to check the
+            # class state and, if necessary, activate the class.  It's
+            # much worse than that though, because we'd also need to
+            # deal with slots.  When a class is ghostified, we'd need
+            # to replace all of the slot operations with versions that
+            # reloaded the object when caled. It's hard to say which
+            # is better for worse.  For now, it seems the risk of
+            # using a class while objects are being invalidated seems
+            # small enough t be acceptable.
+            invalidated = self._invalidated
+            self._invalidated = {}
+            self._txn_time = None
+        finally:
+            self._inv_lock.release()
+        self._cache.invalidate(invalidated)
+        # Now is a good time to collect some garbage.
+        self._cache.incrgc()
+    def tpc_begin(self, transaction):
+        """Begin commit of a transaction, starting the two-phase commit."""
+        self._modified = []
+        # _creating is a list of oids of new objects, which is used to
+        # remove them from the cache if a transaction aborts.
+        self._creating = []
+        self._normal_storage.tpc_begin(transaction)
     def commit(self, transaction):
         """Commit changes to an object"""
+        if self._savepoint_storage is not None:
+            self._commit_savepoint(transaction)
+        self._commit(transaction)
+    def _commit(self, transaction):
+        """Commit changes to an object"""
         if self._import:
             # TODO:  This code seems important for Zope, but needs docs
             # to explain why.
@@ -377,169 +512,6 @@
             self._handle_serial(s, oid)
-    def commit_sub(self, t):
-        """Commit all changes made in subtransactions and begin 2-phase commit
-        """
-        if self._tmp is None:
-            return
-        src = self._storage
-        self._storage = self._tmp
-        self._tmp = None
-        self._log.debug("Commiting subtransaction of size %s", src.getSize())
-        oids = src._index.keys()
-        self._storage.tpc_begin(t)
-        # Copy invalidating and creating info from temporary storage:
-        self._modified.extend(oids)
-        self._creating.extend(src._creating)
-        for oid in oids:
-            data, serial = src.load(oid, src)
-            s = self._storage.store(oid, serial, data, self._version, t)
-            self._handle_serial(s, oid, change=False)
-    def abort_sub(self, t):
-        """Discard all subtransaction data."""
-        if self._tmp is None:
-            return
-        src = self._storage
-        self._storage = self._tmp
-        self._tmp = None
-        # Note: If we invalidate a non-ghostifiable object (i.e. a
-        # persistent class), the object will immediately reread it's
-        # state.  That means that the following call could result in a
-        # call to self.setstate, which, of course, must succeed.  In
-        # general, it would be better if the read could be delayed
-        # until the start of the next transaction.  If we read at the
-        # end of a transaction and if the object was invalidated
-        # during this transaction, then we'll read non-current data,
-        # which we'll discard later in transaction finalization.  We
-        # could, theoretically queue this invalidation by calling
-        # self.invalidate.  Unfortunately, attempts to make that
-        # change resulted in mysterious test failures.  It's pretty
-        # unlikely that the object we are invalidating was invalidated
-        # by another thread, so the risk of a reread is pretty low.
-        # It's really not worth the effort to pursue this.
-        self._cache.invalidate(src._index.keys())
-        self._invalidate_creating(src._creating)
-    def _invalidate_creating(self, creating=None):
-        """Disown any objects newly saved in an uncommitted transaction."""
-        if creating is None:
-            creating = self._creating
-            self._creating = []
-        for oid in creating:
-            o = self._cache.get(oid)
-            if o is not None:
-                del self._cache[oid]
-                del o._p_jar
-                del o._p_oid
-    # The next two methods are callbacks for transaction synchronization.
-    def beforeCompletion(self, txn):
-        # We don't do anything before a commit starts.
-        pass
-    def afterCompletion(self, txn):
-        self._flush_invalidations()
-    def _flush_invalidations(self):
-        self._inv_lock.acquire()
-        try:
-            # Non-ghostifiable objects may need to read when they are
-            # invalidated, so, we'll quickly just replace the
-            # invalidating dict with a new one.  We'll then process
-            # the invalidations after freeing the lock *and* after
-            # reseting the time.  This means that invalidations will
-            # happen after the start of the transactions.  They are
-            # subject to conflict errors and to reading old data,
-            # TODO: There is a potential problem lurking for persistent
-            # classes.  Suppose we have an invlidation of a persistent
-            # class and of an instance.  If the instance is
-            # invalidated first and if the invalidation logic uses
-            # data read from the class, then the invalidation could
-            # be performed with state data.  Or, suppose that there
-            # are instances of the class that are freed as a result of
-            # invalidating some object.  Perhaps code in their __del__
-            # uses class data.  Really, the only way to properly fix
-            # this is to, in fact, make classes ghostifiable.  Then
-            # we'd have to reimplement attribute lookup to check the
-            # class state and, if necessary, activate the class.  It's
-            # much worse than that though, because we'd also need to
-            # deal with slots.  When a class is ghostified, we'd need
-            # to replace all of the slot operations with versions that
-            # reloaded the object when caled. It's hard to say which
-            # is better for worse.  For now, it seems the risk of
-            # using a class while objects are being invalidated seems
-            # small enough t be acceptable.
-            invalidated = self._invalidated
-            self._invalidated = {}
-            self._txn_time = None
-        finally:
-            self._inv_lock.release()
-        self._cache.invalidate(invalidated)
-        # Now is a good time to collect some garbage.
-        self._cache.incrgc()
-    def root(self):
-        """Return the database root object."""
-        return self.get(z64)
-    def db(self):
-        """Returns a handle to the database this connection belongs to."""
-        return self._db
-    def isReadOnly(self):
-        """Returns True if the storage for this connection is read only."""
-        if self._storage is None:
-            raise ConnectionStateError("The database connection is closed")
-        return self._storage.isReadOnly()
-    def invalidate(self, tid, oids):
-        """Notify the Connection that transaction 'tid' invalidated oids."""
-        self._inv_lock.acquire()
-        try:
-            if self._txn_time is None:
-                self._txn_time = tid
-            self._invalidated.update(oids)
-        finally:
-            self._inv_lock.release()
-    # IDataManager
-    def tpc_begin(self, transaction, sub=False):
-        """Begin commit of a transaction, starting the two-phase commit."""
-        self._modified = []
-        # _creating is a list of oids of new objects, which is used to
-        # remove them from the cache if a transaction aborts.
-        self._creating = []
-        if sub and self._tmp is None:
-            # Sub-transaction!
-            self._tmp = self._storage
-            self._storage = TmpStore(self._version, self._storage)
-        self._storage.tpc_begin(transaction)
-    def tpc_vote(self, transaction):
-        """Verify that a data manager can commit the transaction."""
-        try:
-            vote = self._storage.tpc_vote
-        except AttributeError:
-            return
-        s = vote(transaction)
-        self._handle_serial(s)
     def _handle_serial(self, store_return, oid=None, change=1):
         """Handle the returns from store() and tpc_vote() calls."""
@@ -582,26 +554,13 @@
                 obj._p_changed = 0 # transition from changed to up-to-date
             obj._p_serial = serial
-    def tpc_finish(self, transaction):
-        """Indicate confirmation that the transaction is done."""
-        if self._tmp is not None:
-            # Commiting a subtransaction!
-            # There is no need to invalidate anything.
-            self._storage.tpc_finish(transaction)
-            self._storage._creating[:0]=self._creating
-            del self._creating[:]
-        else:
-            def callback(tid):
-                d = {}
-                for oid in self._modified:
-                    d[oid] = 1
-                self._db.invalidate(tid, d, self)
-            self._storage.tpc_finish(transaction, callback)
-        self._tpc_cleanup()
     def tpc_abort(self, transaction):
         if self._import:
             self._import = None
+        if self._savepoint_storage is not None:
+            self._abort_savepoint()
         # Note: If we invalidate a non-justifiable object (i.e. a
@@ -628,41 +587,59 @@
             del obj._p_jar
-    def _tpc_cleanup(self):
-        """Performs cleanup operations to support tpc_finish and tpc_abort."""
-        self._conflicts.clear()
-        if not self._synch:
-            self._flush_invalidations()
-        self._needs_to_join = True
-        self._registered_objects = []
+    def _invalidate_creating(self, creating=None):
+        """Disown any objects newly saved in an uncommitted transaction."""
+        if creating is None:
+            creating = self._creating
+            self._creating = []
-    def sync(self):
-        """Manually update the view on the database."""
-        self._txn_mgr.get().abort()
-        sync = getattr(self._storage, 'sync', 0)
-        if sync:
-            sync()
-        self._flush_invalidations()
+        for oid in creating:
+            o = self._cache.get(oid)
+            if o is not None:
+                del self._cache[oid]
+                del o._p_jar
+                del o._p_oid
-    def getDebugInfo(self):
-        """Returns a tuple with different items for debugging the
-        connection.
-        """
-        return self._debug_info
+    def tpc_vote(self, transaction):
+        """Verify that a data manager can commit the transaction."""
+        try:
+            vote = self._storage.tpc_vote
+        except AttributeError:
+            return
+        s = vote(transaction)
+        self._handle_serial(s)
-    def setDebugInfo(self, *args):
-        """Add the given items to the debug information of this connection."""
-        self._debug_info = self._debug_info + args
+    def tpc_finish(self, transaction):
+        """Indicate confirmation that the transaction is done."""
+        def callback(tid):
+            d = {}
+            for oid in self._modified:
+                d[oid] = 1
+            self._db.invalidate(tid, d, self)
+        self._storage.tpc_finish(transaction, callback)
+        self._tpc_cleanup()
-    def getTransferCounts(self, clear=False):
-        """Returns the number of objects loaded and stored."""
-        res = self._load_count, self._store_count
-        if clear:
-            self._load_count = 0
-            self._store_count = 0
-        return res
+    def sortKey(self):
+        """Return a consistent sort key for this connection."""
+        return "%s:%s" % (self._storage.sortKey(), id(self))
-    ##############################################
+    # Data manager (IDataManager) methods
+    ##########################################################################
+    ##########################################################################
+    # Transaction-manager synchronization -- ISynchronizer
+    def beforeCompletion(self, txn):
+        # We don't do anything before a commit starts.
+        pass
+    def afterCompletion(self, txn):
+        self._flush_invalidations()
+    # Transaction-manager synchronization -- ISynchronizer
+    ##########################################################################
+    ##########################################################################
     # persistent.interfaces.IPersistentDatamanager
     def oldstate(self, obj, tid):
@@ -818,12 +795,24 @@
     def _register(self, obj=None):
-        if obj is not None:
-            self._registered_objects.append(obj)
+        # The order here is important.  We need to join before
+        # registering the object, because joining may take a
+        # savepoint, and the savepoint should not reflect the change
+        # to the object.
         if self._needs_to_join:
             self._needs_to_join = False
+        if obj is not None:
+            self._registered_objects.append(obj)
+    # persistent.interfaces.IPersistentDatamanager
+    ##########################################################################
+    ##########################################################################
     # PROTECTED stuff (used by e.g. ZODB.DB.DB)
     def _cache_items(self):
@@ -862,7 +851,7 @@
         # and Storage.
         self._db = odb
-        self._storage = odb._storage
+        self._normal_storage = self._storage = odb._storage
         self.new_oid = odb._storage.new_oid
         self._opened = time()
         if synch is not None:
@@ -892,6 +881,7 @@
         cache_size = self._cache.cache_size
         self._cache = cache = PickleCache(self, cache_size)
+    ##########################################################################
     # Python protocol
     def __repr__(self):
@@ -901,6 +891,10 @@
             ver = ''
         return '<Connection at %08x%s>' % (positive_id(self), ver)
+    # Python protocol
+    ##########################################################################
+    ##########################################################################
     # DEPRECATION candidates
     __getitem__ = get
@@ -916,33 +910,6 @@
         except KeyError:
             return self.getVersion()
-    def getVersion(self):
-        """Returns the version this connection is attached to."""
-        if self._storage is None:
-            raise ConnectionStateError("The database connection is closed")
-        return self._version
-    def setklassstate(self, obj):
-        # Special case code to handle ZClasses, I think.
-        # Called the cache when an object of type type is invalidated.
-        try:
-            oid = obj._p_oid
-            p, serial = self._storage.load(oid, self._version)
-            # We call getGhost(), but we actually get a non-ghost back.
-            # The object is a class, which can't actually be ghosted.
-            copy = self._reader.getGhost(p)
-            obj.__dict__.clear()
-            obj.__dict__.update(copy.__dict__)
-            obj._p_oid = oid
-            obj._p_jar = self
-            obj._p_changed = 0
-            obj._p_serial = serial
-        except:
-            self._log.error("setklassstate failed", exc_info=sys.exc_info())
-            raise
     def exchange(self, old, new):
         # called by a ZClasses method that isn't executed by the test suite
         oid = old._p_oid
@@ -952,8 +919,20 @@
         self._cache[oid] = new
+    # DEPRECATION candidates
+    ##########################################################################
+    ##########################################################################
     # DEPRECATED methods
+    def cacheFullSweep(self, dt=None):
+        deprecated36("cacheFullSweep is deprecated. "
+                     "Use cacheMinimize instead.")
+        if dt is None:
+            self._cache.full_sweep()
+        else:
+            self._cache.full_sweep(dt)
     def getTransaction(self):
         """Get the current transaction for this connection.
@@ -986,16 +965,150 @@
         return self._txn_mgr
-    def cacheFullSweep(self, dt=None):
-        deprecated36("cacheFullSweep is deprecated. "
-                     "Use cacheMinimize instead.")
-        if dt is None:
-            self._cache.full_sweep()
-        else:
-            self._cache.full_sweep(dt)
+    # DEPRECATED methods
+    ##########################################################################
-    def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
-        """Deactivate all unmodified objects in the cache."""
-        if dt is not DEPRECATED_ARGUMENT:
-            deprecated36("cacheMinimize() dt= is ignored.")
-        self._cache.minimize()
+    #####################################################################
+    # Savepoint support
+    def savepoint(self):
+        if self._savepoint_storage is None:
+            self._savepoint_storage = TmpStore(self._version,
+                                               self._normal_storage)
+            self._storage = self._savepoint_storage
+        self._creating = []
+        self._commit(None)
+        self._storage.creating.extend(self._creating)
+        del self._creating[:]
+        self._registered_objects = []
+        state = self._storage.position, self._storage.index.copy()
+        return Savepoint(self, state)
+    def _rollback(self, state):
+        self._abort()
+        src = self._storage
+        self._cache.invalidate(src.index)
+        src.reset(*state)
+    def _commit_savepoint(self, transaction):
+        """Commit all changes made in subtransactions and begin 2-phase commit
+        """
+        src = self._savepoint_storage
+        self._storage = self._normal_storage
+        self._savepoint_storage = None
+        self._log.debug("Commiting savepoints of size %s", src.getSize())
+        oids = src.index.keys()
+        # Copy invalidating and creating info from temporary storage:
+        self._modified.extend(oids)
+        self._creating.extend(src.creating)
+        for oid in oids:
+            data, serial = src.load(oid, src)
+            s = self._storage.store(oid, serial, data,
+                                    self._version, transaction)
+            self._handle_serial(s, oid, change=False)
+        src.close()
+    def _abort_savepoint(self):
+        """Discard all subtransaction data."""
+        src = self._savepoint_storage
+        self._storage = self._normal_storage
+        self._savepoint_storage = None
+        # Note: If we invalidate a non-ghostifiable object (i.e. a
+        # persistent class), the object will immediately reread it's
+        # state.  That means that the following call could result in a
+        # call to self.setstate, which, of course, must succeed.  In
+        # general, it would be better if the read could be delayed
+        # until the start of the next transaction.  If we read at the
+        # end of a transaction and if the object was invalidated
+        # during this transaction, then we'll read non-current data,
+        # which we'll discard later in transaction finalization.  We
+        # could, theoretically queue this invalidation by calling
+        # self.invalidate.  Unfortunately, attempts to make that
+        # change resulted in mysterious test failures.  It's pretty
+        # unlikely that the object we are invalidating was invalidated
+        # by another thread, so the risk of a reread is pretty low.
+        # It's really not worth the effort to pursue this.
+        self._cache.invalidate(src.index)
+        self._invalidate_creating(src.creating)
+        src.close()
+    # Savepoint support
+    #####################################################################
+class Savepoint:
+    def __init__(self, datamanager, state):
+        self.datamanager = datamanager
+        self.state = state
+    def rollback(self):
+        self.datamanager._rollback(self.state)
+class TmpStore:
+    """A storage-like thing to support savepoints."""
+    def __init__(self, base_version, storage):
+        self._storage = storage
+        for method in (
+            'getName', 'new_oid', 'modifiedInVersion', 'getSize', 
+            'undoLog', 'versionEmpty', 'sortKey',
+            ):
+            setattr(self, method, getattr(storage, method))
+        self._base_version = base_version
+        self._file = tempfile.TemporaryFile()
+        # position: current file position
+        # _tpos: file position at last commit point
+        self.position = 0L
+        # index: map oid to pos of last committed version
+        self.index = {}
+        self.creating = []
+    def __len__(self):
+        return len(self.index)
+    def close(self):
+        self._file.close()
+    def load(self, oid, version):
+        pos = self.index.get(oid)
+        if pos is None:
+            return self._storage.load(oid, self._base_version)
+        self._file.seek(pos)
+        h = self._file.read(8)
+        oidlen = u64(h)
+        read_oid = self._file.read(oidlen)
+        if read_oid != oid:
+            raise POSException.StorageSystemError('Bad temporary storage')
+        h = self._file.read(16)
+        size = u64(h[8:])
+        serial = h[:8]
+        return self._file.read(size), serial
+    def store(self, oid, serial, data, version, transaction):
+        # we have this funny signature so we can reuse the normal non-commit
+        # commit logic
+        assert version == self._base_version
+        self._file.seek(self.position)
+        l = len(data)
+        if serial is None:
+            serial = z64
+        header = p64(len(oid)) + oid + serial + p64(l)
+        self._file.write(header)
+        self._file.write(data)
+        self.index[oid] = self.position
+        self.position += l + len(header)
+        return serial
+    def reset(self, position, index):
+        self._file.truncate(position)
+        self.position = position
+        self.index = index

Deleted: ZODB/branches/3.4/src/ZODB/TmpStore.py
--- ZODB/branches/3.4/src/ZODB/TmpStore.py	2005-04-24 01:28:59 UTC (rev 30128)
+++ ZODB/branches/3.4/src/ZODB/TmpStore.py	2005-04-24 01:29:02 UTC (rev 30129)
@@ -1,126 +0,0 @@
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
-# All Rights Reserved.
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-from ZODB import POSException
-from ZODB.utils import p64, u64, z64
-import tempfile
-class TmpStore:
-    """A storage to support subtransactions."""
-    _bver = ''
-    def __init__(self, base_version, storage):
-        self._transaction = None
-        self._storage = storage
-        if base_version:
-            self._bver = base_version
-        self._file = tempfile.TemporaryFile()
-        # _pos: current file position
-        # _tpos: file position at last commit point
-        self._pos = self._tpos = 0L
-        # _index: map oid to pos of last committed version
-        self._index = {}
-        # _tindex: map oid to pos for new updates
-        self._tindex = {}
-        self._creating = []
-    def close(self):
-        self._file.close()
-    def getName(self):
-        return self._storage.getName()
-    def getSize(self):
-        return self._pos
-    def load(self, oid, version):
-        pos = self._index.get(oid)
-        if pos is None:
-            return self._storage.load(oid, self._bver)
-        self._file.seek(pos)
-        h = self._file.read(8)
-        oidlen = u64(h)
-        read_oid = self._file.read(oidlen)
-        if read_oid != oid:
-            raise POSException.StorageSystemError('Bad temporary storage')
-        h = self._file.read(16)
-        size = u64(h[8:])
-        serial = h[:8]
-        return self._file.read(size), serial
-    def sortKey(self):
-        return self._storage.sortKey()
-    # TODO: clarify difference between self._storage & self._db._storage
-    def modifiedInVersion(self, oid):
-        if self._index.has_key(oid):
-            return self._bver
-        return self._storage.modifiedInVersion(oid)
-    def new_oid(self):
-        return self._storage.new_oid()
-    def registerDB(self, db, limit):
-        pass
-    def store(self, oid, serial, data, version, transaction):
-        if transaction is not self._transaction:
-            raise POSException.StorageTransactionError(self, transaction)
-        self._file.seek(self._pos)
-        l = len(data)
-        if serial is None:
-            serial = z64
-        header = p64(len(oid)) + oid + serial + p64(l)
-        self._file.write(header)
-        self._file.write(data)
-        self._tindex[oid] = self._pos
-        self._pos += l + len(header)
-        return serial
-    def tpc_abort(self, transaction):
-        if transaction is not self._transaction:
-            return
-        self._tindex.clear()
-        self._transaction = None
-        self._pos = self._tpos
-    def tpc_begin(self, transaction):
-        if self._transaction is transaction:
-            return
-        self._transaction = transaction
-        self._tindex.clear() # Just to be sure!
-        self._pos = self._tpos
-    def tpc_vote(self, transaction):
-        pass
-    def tpc_finish(self, transaction, f=None):
-        if transaction is not self._transaction:
-            return
-        if f is not None:
-            f()
-        self._index.update(self._tindex)
-        self._tindex.clear()
-        self._tpos = self._pos
-    def undoLog(self, first, last, filter=None):
-        return ()
-    def versionEmpty(self, version):
-        # TODO: what is this supposed to do?
-        if version == self._bver:
-            return len(self._index)

Modified: ZODB/branches/3.4/src/ZODB/tests/testConnection.py
--- ZODB/branches/3.4/src/ZODB/tests/testConnection.py	2005-04-24 01:28:59 UTC (rev 30128)
+++ ZODB/branches/3.4/src/ZODB/tests/testConnection.py	2005-04-24 01:29:02 UTC (rev 30129)
@@ -294,7 +294,7 @@
         >>> cn.close()  # this was succeeding
         Traceback (most recent call last):
-        ConnectionStateError: Cannot close a connection with a pending subtransaction
+        ConnectionStateError: Cannot close a connection joined to a transaction
         Again this leaves the connection as it was.
         >>> transaction.commit()

Added: ZODB/branches/3.4/src/ZODB/tests/testConnectionSavepoint.py
--- ZODB/branches/3.4/src/ZODB/tests/testConnectionSavepoint.py	2005-04-24 01:28:59 UTC (rev 30128)
+++ ZODB/branches/3.4/src/ZODB/tests/testConnectionSavepoint.py	2005-04-24 01:29:02 UTC (rev 30129)
@@ -0,0 +1,29 @@
+# Copyright (c) 2004 Zope Corporation and Contributors.
+# All Rights Reserved.
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+"""Tests of savepoint feature
+import unittest
+from zope.testing import doctest
+def test_suite():
+    return unittest.TestSuite((
+        doctest.DocFileSuite('testConnectionSavepoint.txt'),
+        ))
+if __name__ == '__main__':
+    unittest.main(defaultTest='test_suite')

Property changes on: ZODB/branches/3.4/src/ZODB/tests/testConnectionSavepoint.py
Name: svn:keywords
   + Id
Name: svn:eol-style
   + native

Added: ZODB/branches/3.4/src/ZODB/tests/testConnectionSavepoint.txt
--- ZODB/branches/3.4/src/ZODB/tests/testConnectionSavepoint.txt	2005-04-24 01:28:59 UTC (rev 30128)
+++ ZODB/branches/3.4/src/ZODB/tests/testConnectionSavepoint.txt	2005-04-24 01:29:02 UTC (rev 30129)
@@ -0,0 +1,137 @@
+Savepoints provide a way to save to disk intermediate work done during
+a transaction allowing:
+- partial transaction (subtransaction) rollback (abort)
+- state of saved objects to be freed, freeing on-line memory for other
+  uses
+Savepoints make it possible to write atomic subroutines that don't
+make top-level transaction commitments.
+To demonstrate how savepoints work with transactions, we'll show an example.
+    >>> import ZODB.tests.util
+    >>> db = ZODB.tests.util.DB()
+    >>> connection = db.open()
+    >>> root = connection.root()
+    >>> root['name'] = 'bob'
+As with other data managers, we can commit changes:
+    >>> import transaction
+    >>> transaction.commit()
+    >>> root['name']
+    'bob'
+and abort changes:
+    >>> root['name'] = 'sally'
+    >>> root['name']
+    'sally'
+    >>> transaction.abort()
+    >>> root['name']
+    'bob'
+Now, lets look at an application that manages funds for people.
+It allows deposits and debits to be entered for multiple people.
+It accepts a sequence of entries and generates a sequence of status
+messages.  For each entry, it applies the change and then validates
+the user's account.  If the user's account is invalid, we role back
+the change for that entry.  The success or failure of an entry is 
+indicated in the output status. First we'll initialize some accounts:
+    >>> root['bob-balance'] = 0.0
+    >>> root['bob-credit'] = 0.0
+    >>> root['sally-balance'] = 0.0
+    >>> root['sally-credit'] = 100.0
+    >>> transaction.commit()
+Now, we'll define a validation function to validate an account:
+    >>> def validate_account(name):
+    ...     if root[name+'-balance'] + root[name+'-credit'] < 0:
+    ...         raise ValueError('Overdrawn', name)
+And a function to apply entries.  If the function fails in some
+unexpected way, it rolls back all of it's changes and 
+prints the error:
+    >>> def apply_entries(entries):
+    ...     savepoint = transaction.savepoint()
+    ...     try:
+    ...         for name, amount in entries:
+    ...             entry_savepoint = transaction.savepoint()
+    ...             try:
+    ...                 root[name+'-balance'] += amount
+    ...                 validate_account(name)
+    ...             except ValueError, error:
+    ...                 entry_savepoint.rollback()
+    ...                 print 'Error', str(error)
+    ...             else:
+    ...                 print 'Updated', name
+    ...     except Exception, error:
+    ...         savepoint.rollback()
+    ...         print 'Unexpected exception', error
+Now let's try applying some entries:
+    >>> apply_entries([
+    ...     ('bob',   10.0),
+    ...     ('sally', 10.0),
+    ...     ('bob',   20.0),
+    ...     ('sally', 10.0),
+    ...     ('bob',   -100.0),
+    ...     ('sally', -100.0),
+    ...     ])
+    Updated bob
+    Updated sally
+    Updated bob
+    Updated sally
+    Error ('Overdrawn', 'bob')
+    Updated sally
+    >>> root['bob-balance']
+    30.0
+    >>> root['sally-balance']
+    -80.0
+If we give provide entries that cause an unexpected error:
+    >>> apply_entries([
+    ...     ('bob',   10.0),
+    ...     ('sally', 10.0),
+    ...     ('bob',   '20.0'),
+    ...     ('sally', 10.0),
+    ...     ])
+    Updated bob
+    Updated sally
+    Unexpected exception unsupported operand type(s) for +=: 'float' and 'str'
+Because the apply_entries used a savepoint for the entire function, 
+it was able to rollback the partial changes without rolling back
+changes made in the previous call to apply_entries:
+    >>> root['bob-balance']
+    30.0
+    >>> root['sally-balance']
+    -80.0
+If we now abort the outer transactions, the earlier changes will go
+    >>> transaction.abort()
+    >>> root['bob-balance']
+    0.0
+    >>> root['sally-balance']
+    0.0

Property changes on: ZODB/branches/3.4/src/ZODB/tests/testConnectionSavepoint.txt
Name: svn:eol-style
   + native

Modified: ZODB/branches/3.4/src/ZODB/tests/testZODB.py
--- ZODB/branches/3.4/src/ZODB/tests/testZODB.py	2005-04-24 01:28:59 UTC (rev 30128)
+++ ZODB/branches/3.4/src/ZODB/tests/testZODB.py	2005-04-24 01:29:02 UTC (rev 30129)
@@ -550,13 +550,17 @@
         self.assertEqual(rt['a'], 1)
         rt['b'] = 2
         # Subtransactions don't do tpc_vote, so we poison tpc_begin.
-        poisoned = PoisonedObject(PoisonedJar(break_tpc_begin=True))
-        transaction.get().register(poisoned)
+        poisoned = PoisonedJar()
+        transaction.get().join(poisoned)
+        poisoned.break_savepoint = True
         self.assertRaises(PoisonedError, transaction.get().commit, True)
         # Trying to subtxn-commit again fails too.
-        self.assertRaises(TransactionFailedError, transaction.get().commit, True)
-        self.assertRaises(TransactionFailedError, transaction.get().commit, True)
+        self.assertRaises(TransactionFailedError,
+                          transaction.get().commit, True)
+        self.assertRaises(TransactionFailedError,
+                          transaction.get().commit, True)
         # Top-level commit also fails.
         self.assertRaises(TransactionFailedError, transaction.get().commit)
@@ -568,6 +572,7 @@
         # also raises TransactionFailedError.
         self.assertRaises(TransactionFailedError, rt.__setitem__, 'b', 2)
         # Clean up via abort(), and try again.
         rt['a'] = 1
@@ -576,13 +581,18 @@
         # Cleaning up via begin() should also work.
         rt['a'] = 2
-        transaction.get().register(poisoned)
+        poisoned = PoisonedJar()
+        transaction.get().join(poisoned)
+        poisoned.break_savepoint = True
         self.assertRaises(PoisonedError, transaction.get().commit, True)
-        self.assertRaises(TransactionFailedError, transaction.get().commit, True)
+        self.assertRaises(TransactionFailedError,
+                          transaction.get().commit, True)
         # The change to rt['a'] is lost.
         self.assertEqual(rt['a'], 1)
         # Trying to modify an object also fails.
         self.assertRaises(TransactionFailedError, rt.__setitem__, 'b', 2)
         # Clean up via begin(), and try again.
         rt['a'] = 2
@@ -603,9 +613,11 @@
 # PoisonedJar arranges to raise exceptions from interesting places.
 # For whatever reason, subtransaction commits don't call tpc_vote.
 class PoisonedJar:
-    def __init__(self, break_tpc_begin=False, break_tpc_vote=False):
+    def __init__(self, break_tpc_begin=False, break_tpc_vote=False,
+                 break_savepoint=False):
         self.break_tpc_begin = break_tpc_begin
         self.break_tpc_vote = break_tpc_vote
+        self.break_savepoint = break_savepoint
     def sortKey(self):
         return str(id(self))
@@ -620,14 +632,10 @@
         if self.break_tpc_vote:
             raise PoisonedError("tpc_vote fails")
-    # commit_sub is needed else this jar is ignored during subtransaction
-    # commit.
-    def commit_sub(*args):
-        pass
+    def savepoint(self):
+        if self.break_savepoint:
+            raise PoisonedError("savepoint fails")        
-    def abort_sub(*args):
-        pass
     def commit(*args):

Modified: ZODB/branches/3.4/src/ZODB/tests/test_datamanageradapter.py
--- ZODB/branches/3.4/src/ZODB/tests/test_datamanageradapter.py	2005-04-24 01:28:59 UTC (rev 30128)
+++ ZODB/branches/3.4/src/ZODB/tests/test_datamanageradapter.py	2005-04-24 01:29:02 UTC (rev 30129)
@@ -238,569 +238,6 @@
-def test_commit_w_subtransactions():
-    """
-    So, we have a data manager:
-    >>> dm = DataManager()
-    and we do some work that modifies uncommited state:
-    >>> dm.inc()
-    >>> dm.state, dm.delta
-    (0, 1)
-    Now we'll commit the changes in a subtransaction.  When the data
-    manager joins a transaction, the transaction will create an
-    adapter.
-    >>> dma = DataManagerAdapter(dm)
-    and register it as a modified object. At commit time, the
-    transaction will get the "jar" like this:
-    >>> jar = getattr(dma, '_p_jar', dma)
-    and, of course, the jar and the adapter will be the same:
-    >>> jar is dma
-    True
-    The transaction will call tpc_begin:
-    >>> t1 = '1'
-    >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
-    Then the transaction will call commit on the jar:
-    >>> jar.commit(t1)
-    This doesn't actually do anything. :)
-    >>> dm.state, dm.delta
-    (0, 1)
-    The transaction will then call tpc_vote:
-    >>> jar.tpc_vote(t1)
-    This doesn't do anything either, because zodb4 data managers don't
-    actually do two-phase commit for subtransactions.
-    >>> dm.state, dm.delta
-    (0, 1)
-    Finally, we call tpc_finish. This does actally create a savepoint,
-    but we can't really tell that from outside.
-    >>> jar.tpc_finish(t1)
-    >>> dm.state, dm.delta
-    (0, 1)
-    We'll do more of the above:
-    >>> dm.inc()
-    >>> dm.state, dm.delta
-    (0, 2)
-    >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
-    >>> jar.commit(t1)
-    >>> jar.tpc_vote(t1)
-    >>> jar.tpc_finish(t1)
-    >>> dm.state, dm.delta
-    (0, 2)
-    >>> dm.inc()
-    >>> dm.state, dm.delta
-    (0, 3)
-    >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
-    >>> jar.commit(t1)
-    >>> jar.tpc_vote(t1)
-    >>> jar.tpc_finish(t1)
-    >>> dm.state, dm.delta
-    (0, 3)
-    Note that the bove works *because* the same transaction is used
-    for each subtransaction.
-    Finally, we'll do a little more work:
-    >>> dm.inc()
-    >>> dm.inc()
-    >>> dm.state, dm.delta
-    (0, 5)
-    and then commit the top-level transaction.
-    The transaction  will actually go through the steps for a subtransaction:
-    >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
-    >>> jar.commit(t1)
-    >>> jar.tpc_vote(t1)
-    >>> jar.tpc_finish(t1)
-    And then call commit_sub:
-    >>> jar.commit_sub(t1)
-    As usual, this doesn't actually do anything. ;)
-    >>> dm.state, dm.delta
-    (0, 5)
-    The transaction manager doesn's call tpc_begin, because commit_sub
-    implies the start of two-phase commit. Next, it does call commit:
-    >>> jar.commit(t1)
-    which doesn't do anything.
-    Finally, the transaction calls tpc_vote:
-    >>> jar.tpc_vote(t1)
-    which actually does something (because this is the top-level txn):
-    >>> dm.state, dm.delta
-    (5, 5)
-    >>> dm.prepared
-    True
-    Finally, tpc_finish is called:
-    >>> jar.tpc_finish(t1)
-    and the data manager finishes the two-phase commit:
-    >>> dm.state, dm.delta
-    (5, 0)
-    >>> dm.prepared
-    False
-    """
-def test_commit_w_subtransactions_featuring_subtransaction_abort():
-    """
-    So, we have a data manager:
-    >>> dm = DataManager()
-    and we do some work that modifies uncommited state:
-    >>> dm.inc()
-    >>> dm.state, dm.delta
-    (0, 1)
-    Now we'll commit the changes in a subtransaction.  When the data
-    manager joins a transaction, the transaction will create an
-    adapter.
-    >>> dma = DataManagerAdapter(dm)
-    and register it as a modified object. At commit time, the
-    transaction will get the "jar" like this:
-    >>> jar = getattr(dma, '_p_jar', dma)
-    and, of course, the jar and the adapter will be the same:
-    >>> jar is dma
-    True
-    The transaction will call tpc_begin:
-    >>> t1 = '1'
-    >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
-    Then the transaction will call commit on the jar:
-    >>> jar.commit(t1)
-    This doesn't actually do anything. :)
-    >>> dm.state, dm.delta
-    (0, 1)
-    The transaction will then call tpc_vote:
-    >>> jar.tpc_vote(t1)
-    This doesn't do anything either, because zodb4 data managers don't
-    actually do two-phase commit for subtransactions.
-    >>> dm.state, dm.delta
-    (0, 1)
-    Finally, we call tpc_finish. This does actally create a savepoint,
-    but we can't really tell that from outside.
-    >>> jar.tpc_finish(t1)
-    >>> dm.state, dm.delta
-    (0, 1)
-    We'll do more of the above:
-    >>> dm.inc()
-    >>> dm.state, dm.delta
-    (0, 2)
-    >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
-    >>> jar.commit(t1)
-    >>> jar.tpc_vote(t1)
-    >>> jar.tpc_finish(t1)
-    >>> dm.state, dm.delta
-    (0, 2)
-    >>> dm.inc()
-    >>> dm.state, dm.delta
-    (0, 3)
-    But then we'll decide to abort a subtransaction.
-    The transaction will just call abort as usual:
-    >>> jar.abort(t1)
-    This will cause a rollback to the last savepoint:
-    >>> dm.state, dm.delta
-    (0, 2)
-    Then we do more work:
-    >>> dm.inc()
-    >>> dm.state, dm.delta
-    (0, 3)
-    >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
-    >>> jar.commit(t1)
-    >>> jar.tpc_vote(t1)
-    >>> jar.tpc_finish(t1)
-    >>> dm.state, dm.delta
-    (0, 3)
-    Note that the bove works *because* the same transaction is used
-    for each subtransaction.
-    Finally, we'll do a little more work:
-    >>> dm.inc()
-    >>> dm.inc()
-    >>> dm.state, dm.delta
-    (0, 5)
-    and then commit the top-level transaction.
-    The transaction  will actually go through the steps for a subtransaction:
-    >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
-    >>> jar.commit(t1)
-    >>> jar.tpc_vote(t1)
-    >>> jar.tpc_finish(t1)
-    And then call commit_sub:
-    >>> jar.commit_sub(t1)
-    As usual, this doesn't actually do anything. ;)
-    >>> dm.state, dm.delta
-    (0, 5)
-    The transaction manager doesn's call tpc_begin, because commit_sub
-    implies the start of two-phase commit. Next, it does call commit:
-    >>> jar.commit(t1)
-    which doesn't do anything.
-    Finally, the transaction calls tpc_vote:
-    >>> jar.tpc_vote(t1)
-    which actually does something (because this is the top-level txn):
-    >>> dm.state, dm.delta
-    (5, 5)
-    >>> dm.prepared
-    True
-    Finally, tpc_finish is called:
-    >>> jar.tpc_finish(t1)
-    and the data manager finishes the two-phase commit:
-    >>> dm.state, dm.delta
-    (5, 0)
-    >>> dm.prepared
-    False
-    """
-def test_abort_w_subtransactions():
-    """
-    So, we have a data manager:
-    >>> dm = DataManager()
-    and we do some work that modifies uncommited state:
-    >>> dm.inc()
-    >>> dm.state, dm.delta
-    (0, 1)
-    Now we'll commit the changes in a subtransaction.  When the data
-    manager joins a transaction, the transaction will create an
-    adapter.
-    >>> dma = DataManagerAdapter(dm)
-    and register it as a modified object. At commit time, the
-    transaction will get the "jar" like this:
-    >>> jar = getattr(dma, '_p_jar', dma)
-    and, of course, the jar and the adapter will be the same:
-    >>> jar is dma
-    True
-    The transaction will call tpc_begin:
-    >>> t1 = '1'
-    >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
-    Then the transaction will call commit on the jar:
-    >>> jar.commit(t1)
-    This doesn't actually do anything. :)
-    >>> dm.state, dm.delta
-    (0, 1)
-    The transaction will then call tpc_vote:
-    >>> jar.tpc_vote(t1)
-    This doesn't do anything either, because zodb4 data managers don't
-    actually do two-phase commit for subtransactions.
-    >>> dm.state, dm.delta
-    (0, 1)
-    Finally, we call tpc_finish. This does actally create a savepoint,
-    but we can't really tell that from outside.
-    >>> jar.tpc_finish(t1)
-    >>> dm.state, dm.delta
-    (0, 1)
-    We'll do more of the above:
-    >>> dm.inc()
-    >>> dm.state, dm.delta
-    (0, 2)
-    >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
-    >>> jar.commit(t1)
-    >>> jar.tpc_vote(t1)
-    >>> jar.tpc_finish(t1)
-    >>> dm.state, dm.delta
-    (0, 2)
-    >>> dm.inc()
-    >>> dm.state, dm.delta
-    (0, 3)
-    >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
-    >>> jar.commit(t1)
-    >>> jar.tpc_vote(t1)
-    >>> jar.tpc_finish(t1)
-    >>> dm.state, dm.delta
-    (0, 3)
-    Note that the bove works *because* the same transaction is used
-    for each subtransaction.
-    Finally, we'll do a little more work:
-    >>> dm.inc()
-    >>> dm.inc()
-    >>> dm.state, dm.delta
-    (0, 5)
-    and then abort the top-level transaction.
-    The transaction first call abort on the jar:
-    >>> jar.abort(t1)
-    This will have the effect of aborting the subtrancation:
-    >>> dm.state, dm.delta
-    (0, 3)
-    Then the transaction will call abort_sub:
-    >>> jar.abort_sub(t1)
-    This will abort all of the subtransactions:
-    >>> dm.state, dm.delta
-    (0, 0)
-    """
-def test_tpc_abort_w_subtransactions_featuring_subtransaction_abort():
-    """
-    So, we have a data manager:
-    >>> dm = DataManager()
-    and we do some work that modifies uncommited state:
-    >>> dm.inc()
-    >>> dm.state, dm.delta
-    (0, 1)
-    Now we'll commit the changes in a subtransaction.  When the data
-    manager joins a transaction, the transaction will create an
-    adapter.
-    >>> dma = DataManagerAdapter(dm)
-    and register it as a modified object. At commit time, the
-    transaction will get the "jar" like this:
-    >>> jar = getattr(dma, '_p_jar', dma)
-    and, of course, the jar and the adapter will be the same:
-    >>> jar is dma
-    True
-    The transaction will call tpc_begin:
-    >>> t1 = '1'
-    >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
-    Then the transaction will call commit on the jar:
-    >>> jar.commit(t1)
-    This doesn't actually do anything. :)
-    >>> dm.state, dm.delta
-    (0, 1)
-    The transaction will then call tpc_vote:
-    >>> jar.tpc_vote(t1)
-    This doesn't do anything either, because zodb4 data managers don't
-    actually do two-phase commit for subtransactions.
-    >>> dm.state, dm.delta
-    (0, 1)
-    Finally, we call tpc_finish. This does actally create a savepoint,
-    but we can't really tell that from outside.
-    >>> jar.tpc_finish(t1)
-    >>> dm.state, dm.delta
-    (0, 1)
-    We'll do more of the above:
-    >>> dm.inc()
-    >>> dm.state, dm.delta
-    (0, 2)
-    >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
-    >>> jar.commit(t1)
-    >>> jar.tpc_vote(t1)
-    >>> jar.tpc_finish(t1)
-    >>> dm.state, dm.delta
-    (0, 2)
-    >>> dm.inc()
-    >>> dm.state, dm.delta
-    (0, 3)
-    But then we'll decide to abort a subtransaction.
-    The transaction will just call abort as usual:
-    >>> jar.abort(t1)
-    This will cause a rollback to the last savepoint:
-    >>> dm.state, dm.delta
-    (0, 2)
-    Then we do more work:
-    >>> dm.inc()
-    >>> dm.state, dm.delta
-    (0, 3)
-    >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
-    >>> jar.commit(t1)
-    >>> jar.tpc_vote(t1)
-    >>> jar.tpc_finish(t1)
-    >>> dm.state, dm.delta
-    (0, 3)
-    Note that the bove works *because* the same transaction is used
-    for each subtransaction.
-    Finally, we'll do a little more work:
-    >>> dm.inc()
-    >>> dm.inc()
-    >>> dm.state, dm.delta
-    (0, 5)
-    and then commit the top-level transaction.
-    The transaction  will actually go through the steps for a subtransaction:
-    >>> jar.tpc_begin(t1, 1) # 1 -> subtxn
-    >>> jar.commit(t1)
-    >>> jar.tpc_vote(t1)
-    >>> jar.tpc_finish(t1)
-    And then call commit_sub:
-    >>> jar.commit_sub(t1)
-    As usual, this doesn't actually do anything. ;)
-    >>> dm.state, dm.delta
-    (0, 5)
-    The transaction manager doesn's call tpc_begin, because commit_sub
-    implies the start of two-phase commit. Next, it does call commit:
-    >>> jar.commit(t1)
-    which doesn't do anything.
-    Finally, the transaction calls tpc_vote:
-    >>> jar.tpc_vote(t1)
-    which actually does something (because this is the top-level txn):
-    >>> dm.state, dm.delta
-    (5, 5)
-    >>> dm.prepared
-    True
-    Now, at the last minute, the transaction is aborted (possibly due
-    to a "no vote" from another data manager):
-    >>> jar.tpc_abort(t1)
-    An the changes are undone:
-    >>> dm.state, dm.delta
-    (0, 0)
-    >>> dm.prepared
-    False
-    """
 def test_suite():
     return DocTestSuite()

Modified: ZODB/branches/3.4/src/transaction/__init__.py
--- ZODB/branches/3.4/src/transaction/__init__.py	2005-04-24 01:28:59 UTC (rev 30128)
+++ ZODB/branches/3.4/src/transaction/__init__.py	2005-04-24 01:29:02 UTC (rev 30129)
@@ -24,6 +24,7 @@
 begin = manager.begin
 commit = manager.commit
 abort = manager.abort
+savepoint = manager.savepoint
 def get_transaction():
     from ZODB.utils import deprecated36

Modified: ZODB/branches/3.4/src/transaction/_manager.py
--- ZODB/branches/3.4/src/transaction/_manager.py	2005-04-24 01:28:59 UTC (rev 30128)
+++ ZODB/branches/3.4/src/transaction/_manager.py	2005-04-24 01:29:02 UTC (rev 30129)
@@ -67,6 +67,9 @@
     def abort(self, sub=False):
+    def savepoint(self, optimistic=False):
+        return self.get().savepoint(optimistic)
 class ThreadTransactionManager(TransactionManager):
     """Thread-aware transaction manager.

Modified: ZODB/branches/3.4/src/transaction/_transaction.py
--- ZODB/branches/3.4/src/transaction/_transaction.py	2005-04-24 01:28:59 UTC (rev 30128)
+++ ZODB/branches/3.4/src/transaction/_transaction.py	2005-04-24 01:29:02 UTC (rev 30129)
@@ -30,6 +30,8 @@
+Note: Suntransactions are deprecated!
 A subtransaction applies the transaction notion recursively.  It
 allows a set of modifications within a transaction to be committed or
 aborted as a group.  A subtransaction is a strictly local activity;
@@ -82,7 +84,7 @@
 tpc_begin() on each resource manager before calling commit() on any of
-    1. tpc_begin(txn, subtransaction=False)
+    1. tpc_begin(txn)
     2. commit(txn)
     3. tpc_vote(txn)
     4. tpc_finish(txn)
@@ -90,6 +92,8 @@
 Subtransaction commit
+Note: Subtransactions are deprecated!
 When a subtransaction commits, the protocol is different.
 1. tpc_begin() is passed a second argument, which indicates that a
@@ -128,8 +132,7 @@
 call abort().
 Once uncommitted objects are aborted, tpc_abort() or abort_sub() is
-called on each resource manager.  abort_sub() is called if the
-resource manager was involved in a subtransaction.
+called on each resource manager.
@@ -213,14 +216,6 @@
         self.log = logging.getLogger("txn.%d" % thread.get_ident())
         self.log.debug("new transaction")
-        # _sub contains all of the resource managers involved in
-        # subtransactions.  It maps id(a resource manager) to the resource
-        # manager.
-        self._sub = {}
-        # _nonsub contains all the resource managers that do not support
-        # subtransactions that were involved in subtransaction commits.
-        self._nonsub = {}
         # If a commit fails, the traceback is saved in _failure_traceback.
         # If another attempt is made to commit, TransactionFailedError is
         # raised, incorporating this traceback.
@@ -231,6 +226,9 @@
         # inefficient for FIFO access of this kind.
         self._before_commit = []
+        # Keep track of the last savepoint
+        self._last_savepoint = None
     # Raise TransactionFailedError, due to commit()/join()/register()
     # getting called when the current transaction has already suffered
     # a commit failure.
@@ -258,6 +256,34 @@
             resource = DataManagerAdapter(resource)
+        if self._last_savepoint is not None:
+            self._last_savepoint.join(resource)
+    def savepoint(self, optimistic=False):
+        if self.status is Status.COMMITFAILED:
+            self._prior_commit_failed() # doesn't return, it raises
+        try:
+            savepoint = Savepoint(optimistic)
+            for resource in self._resources:
+                savepoint.join(resource)
+        except:
+            self._cleanup(self._resources)
+            self._saveCommitishError() # doesn't return, it raises!
+        if self._last_savepoint is not None:
+            savepoint.previous = self._last_savepoint
+            self._last_savepoint.next = savepoint
+        self._last_savepoint = savepoint
+        return savepoint
+    def _invalidate_last_savepoint(self):
+        # Invalidate the last savepoint and any previous
+        # savepoints. This is done on a commit or abort.
+        if self._last_savepoint is not None:
+            self._last_savepoint._invalidate_previous()
+            self._last_savepoint = None
     def register(self, obj):
         # The old way of registering transaction participants.
@@ -273,10 +299,7 @@
             raise ValueError("Register with no manager")
         adapter = self._adapters.get(manager)
         if adapter is None:
-            if myhasattr(manager, "commit_sub"):
-                adapter = MultiObjectResourceAdapterSub(manager)
-            else:
-                adapter = MultiObjectResourceAdapter(manager)
+            adapter = MultiObjectResourceAdapter(manager)
             self._adapters[manager] = adapter
@@ -286,67 +309,59 @@
             assert id(obj) not in map(id, adapter.objects)
-            # In the presence of subtransactions, an existing adapter
-            # might be in _adapters but not in _resources.
-            if adapter not in self._resources:
-                self._resources.append(adapter)
     def begin(self):
         from ZODB.utils import deprecated36
         deprecated36("Transaction.begin() should no longer be used; use "
                       "the begin() method of a transaction manager.")
-        if (self._resources or
-              self._sub or
-              self._nonsub or
-              self._synchronizers):
+        if (self._resources or self._synchronizers):
         # Else aborting wouldn't do anything, except if _manager is non-None,
         # in which case it would do nothing besides uselessly free() this
         # transaction.
     def commit(self, subtransaction=False):
+        self._invalidate_last_savepoint()
+        if subtransaction:
+            # TODO depricate subtransactions
+            self.savepoint(1)
+            return
         if self.status is Status.COMMITFAILED:
             self._prior_commit_failed() # doesn't return
-        if not subtransaction:
-            self._callBeforeCommitHooks()
+        self._callBeforeCommitHooks()
-        if not subtransaction and self._sub and self._resources:
-            # This commit is for a top-level transaction that has
-            # previously committed subtransactions.  Do one last
-            # subtransaction commit to clear out the current objects,
-            # then commit all the subjars.
-            self.commit(True)
+        self._synchronizers.map(lambda s: s.beforeCompletion(self))
+        self.status = Status.COMMITTING
-        if not subtransaction:
-            self._synchronizers.map(lambda s: s.beforeCompletion(self))
-            self.status = Status.COMMITTING
-            self._commitResources(subtransaction)
+            self._commitResources()
-            self.status = Status.COMMITFAILED
-            # Save the traceback for TransactionFailedError.
-            ft = self._failure_traceback = StringIO()
-            t, v, tb = sys.exc_info()
-            # Record how we got into commit().
-            traceback.print_stack(sys._getframe(1), None, ft)
-            # Append the stack entries from here down to the exception.
-            traceback.print_tb(tb, None, ft)
-            # Append the exception type and value.
-            ft.writelines(traceback.format_exception_only(t, v))
-            raise t, v, tb
+            self._saveCommitishError() # This raises!
-        if subtransaction:
-            self._resources = []
-        else:
-            self.status = Status.COMMITTED
-            if self._manager:
-                self._manager.free(self)
-            self._synchronizers.map(lambda s: s.afterCompletion(self))
-            self.log.debug("commit")
+        self.status = Status.COMMITTED
+        if self._manager:
+            self._manager.free(self)
+        self._synchronizers.map(lambda s: s.afterCompletion(self))
+        self.log.debug("commit")
+    def _saveCommitishError(self):
+        self.status = Status.COMMITFAILED
+        # Save the traceback for TransactionFailedError.
+        ft = self._failure_traceback = StringIO()
+        t, v, tb = sys.exc_info()
+        # Record how we got into commit().
+        traceback.print_stack(sys._getframe(1), None, ft)
+        # Append the stack entries from here down to the exception.
+        traceback.print_tb(tb, None, ft)
+        # Append the exception type and value.
+        ft.writelines(traceback.format_exception_only(t, v))
+        raise t, v, tb
     def beforeCommitHook(self, hook, *args, **kws):
         self._before_commit.append((hook, args, kws))
@@ -357,31 +372,20 @@
             hook, args, kws = self._before_commit.pop(0)
             hook(*args, **kws)
-    def _commitResources(self, subtransaction):
+    def _commitResources(self):
         # Execute the two-phase commit protocol.
-        L = self._getResourceManagers(subtransaction)
+        L = list(self._resources)
+        L.sort(rm_cmp)
             for rm in L:
-                # If you pass subtransaction=True to tpc_begin(), it
-                # will create a temporary storage for the duration of
-                # the transaction.  To signal that the top-level
-                # transaction is committing, you must then call
-                # commit_sub().
-                if not subtransaction and id(rm) in self._sub:
-                    del self._sub[id(rm)]
-                    rm.commit_sub(self)
-                else:
-                    rm.tpc_begin(self, subtransaction)
+                rm.tpc_begin(self)
             for rm in L:
                 self.log.debug("commit %r" % rm)
-            if not subtransaction:
-                # Not sure why, but it is intentional that you do not
-                # call tpc_vote() for subtransaction commits.
-                for rm in L:
-                    rm.tpc_vote(self)
-                    self._voted[id(rm)] = True
+            for rm in L:
+                rm.tpc_vote(self)
+                self._voted[id(rm)] = True
                 for rm in L:
@@ -401,8 +405,7 @@
-                if not subtransaction:
-                    self._synchronizers.map(lambda s: s.afterCompletion(self))
+                self._synchronizers.map(lambda s: s.afterCompletion(self))
             raise t, v, tb
     def _cleanup(self, L):
@@ -415,68 +418,30 @@
                     self.log.error("Error in abort() on manager %s",
                                    rm, exc_info=sys.exc_info())
         for rm in L:
-            if id(rm) in self._sub:
-                try:
-                    rm.abort_sub(self)
-                except Exception:
-                    self.log.error("Error in abort_sub() on manager %s",
-                                   rm, exc_info=sys.exc_info())
-            else:
-                try:
-                    rm.tpc_abort(self)
-                except Exception:
-                    self.log.error("Error in tpc_abort() on manager %s",
-                                   rm, exc_info=sys.exc_info())
+            try:
+                rm.tpc_abort(self)
+            except Exception:
+                self.log.error("Error in tpc_abort() on manager %s",
+                               rm, exc_info=sys.exc_info())
-    def _getResourceManagers(self, subtransaction):
-        L = []
-        if subtransaction:
-            # If we are in a subtransaction, make sure all resource
-            # managers are placed in either _sub or _nonsub.  When
-            # the top-level transaction commits, we need to merge
-            # these back into the resource set.
-            # If a data manager doesn't support sub-transactions, we
-            # don't do anything with it now.  (That's somewhat okay,
-            # because subtransactions are mostly just an
-            # optimization.)  Save it until the top-level transaction
-            # commits.
-            for rm in self._resources:
-                if myhasattr(rm, "commit_sub"):
-                    self._sub[id(rm)] = rm
-                    L.append(rm)
-                else:
-                    self._nonsub[id(rm)] = rm
-        else:
-            if self._sub or self._nonsub:
-                # Merge all of _sub, _nonsub, and _resources.
-                d = dict(self._sub)
-                d.update(self._nonsub)
-                # TODO: I think _sub and _nonsub are disjoint, and that
-                #       _resources is empty.  If so, we can simplify this code.
-                assert len(d) == len(self._sub) + len(self._nonsub)
-                assert not self._resources
-                for rm in self._resources:
-                    d[id(rm)] = rm
-                L = d.values()
-            else:
-                L = list(self._resources)
-        L.sort(rm_cmp)
-        return L
     def abort(self, subtransaction=False):
-        if not subtransaction:
-            self._synchronizers.map(lambda s: s.beforeCompletion(self))
-        if subtransaction and self._nonsub:
-            from ZODB.POSException import TransactionError
-            raise TransactionError("Resource manager does not support "
-                                   "subtransaction abort")
+        if subtransaction:
+            # TODO deprecate subtransactions
+            if not self._last_savepoint:
+                raise interfaces.InvalidSavepointRollbackError
+            if self._last_savepoint.valid:
+                # We're supposed to be able to call abort(1) multiple
+                # times. Sigh.
+                self._last_savepoint.rollback()
+            return
+        self._invalidate_last_savepoint()
+        self._synchronizers.map(lambda s: s.beforeCompletion(self))
         tb = None
-        for rm in self._resources + self._nonsub.values():
+        for rm in self._resources:
@@ -485,21 +450,12 @@
                 self.log.error("Failed to abort resource manager: %s",
                                rm, exc_info=sys.exc_info())
-        if not subtransaction:
-            for rm in self._sub.values():
-                try:
-                    rm.abort_sub(self)
-                except:
-                    if tb is None:
-                        t, v, tb = sys.exc_info()
-                    self.log.error("Failed to abort_sub resource manager: %s",
-                                   rm, exc_info=sys.exc_info())
+        if self._manager:
+            self._manager.free(self)
+        self._synchronizers.map(lambda s: s.afterCompletion(self))
-        if not subtransaction:
-            if self._manager:
-                self._manager.free(self)
-            self._synchronizers.map(lambda s: s.afterCompletion(self))
-            self.log.debug("abort")
+        self.log.debug("abort")
         if tb is not None:
             raise t, v, tb
@@ -539,8 +495,8 @@
     def sortKey(self):
         return self.manager.sortKey()
-    def tpc_begin(self, txn, sub=False):
-        self.manager.tpc_begin(txn, sub)
+    def tpc_begin(self, txn):
+        self.manager.tpc_begin(txn)
     def tpc_finish(self, txn):
@@ -571,25 +527,6 @@
         if tb is not None:
             raise t, v, tb
-class MultiObjectResourceAdapterSub(MultiObjectResourceAdapter):
-    """Adapt resource managers that participate in subtransactions."""
-    def commit_sub(self, txn):
-        self.manager.commit_sub(txn)
-    def abort_sub(self, txn):
-        self.manager.abort_sub(txn)
-    def tpc_begin(self, txn, sub=False):
-        self.manager.tpc_begin(txn, sub)
-        self.sub = sub
-    def tpc_finish(self, txn):
-        self.manager.tpc_finish(txn)
-        if self.sub:
-            self.objects = []
 def rm_cmp(rm1, rm2):
     return cmp(rm1.sortKey(), rm2.sortKey())
@@ -624,50 +561,82 @@
     def __init__(self, datamanager):
         self._datamanager = datamanager
-        self._rollback = None
     # TODO: I'm not sure why commit() doesn't do anything
     def commit(self, transaction):
+        # We don't do anything here because ZODB4-style data managers
+        # didn't have a separate commit step
     def abort(self, transaction):
-        # We need to discard any changes since the last save point, or all
-        # changes
-        if self._rollback is None:
-            # No previous savepoint, so just abort
-            self._datamanager.abort(transaction)
-        else:
-            self._rollback()
-    def abort_sub(self, transaction):
-    def commit_sub(self, transaction):
-        # Nothing to do wrt data, be we begin 2pc for the top-level
-        # trans
-        self._sub = False
-    def tpc_begin(self, transaction, subtransaction=False):
-        self._sub = subtransaction
+    def tpc_begin(self, transaction):
+        # We don't do anything here because ZODB4-style data managers
+        # didn't have a separate tpc_begin step
+        pass
     def tpc_abort(self, transaction):
-        if self._sub:
-            self.abort(self, transaction)
-        else:
-            self._datamanager.abort(transaction)
+        self._datamanager.abort(transaction)
     def tpc_finish(self, transaction):
-        if self._sub:
-            self._rollback = self._datamanager.savepoint(transaction).rollback
-        else:
-            self._datamanager.commit(transaction)
+        self._datamanager.commit(transaction)
     def tpc_vote(self, transaction):
-        if not self._sub:
-            self._datamanager.prepare(transaction)
+        self._datamanager.prepare(transaction)
     def sortKey(self):
         return self._datamanager.sortKey()
+class Savepoint:
+    """Transaction savepoint
+    Transaction savepoints coordinate savepoints for data managers
+    participating in a transaction.
+    """
+    interface.implements(interfaces.ISavepoint)
+    def __init__(self, optimistic):
+        self._savepoints = []
+        self.valid = True
+        self.next = self.previous = None
+        self.optimistic = optimistic
+    def join(self, datamanager):
+        try:
+            savepoint = datamanager.savepoint
+        except AttributeError:
+            if not self.optimistic:
+                raise TypeError("Savepoints unsupported", datamanager)
+            savepoint = NoRollbackSavepoint(datamanager)
+        else:
+            savepoint = savepoint()
+        self._savepoints.append(savepoint)
+    def rollback(self):
+        if not self.valid:
+            raise interfaces.InvalidSavepointRollbackError
+        self._invalidate_next()
+        for savepoint in self._savepoints:
+            savepoint.rollback()
+    def _invalidate_next(self):
+        self.valid = False
+        if self.next is not None:
+            self.next._invalidate_next()
+    def _invalidate_previous(self):
+        self.valid = False
+        if self.previous is not None:
+            self.previous._invalidate_previous()
+class NoRollbackSavepoint:
+    def __init__(self, datamanager):
+        self.datamanager = datamanager
+    def rollback(self):
+        raise TypeError("Savepoints unsupported", self.datamanager)

Modified: ZODB/branches/3.4/src/transaction/interfaces.py
--- ZODB/branches/3.4/src/transaction/interfaces.py	2005-04-24 01:28:59 UTC (rev 30128)
+++ ZODB/branches/3.4/src/transaction/interfaces.py	2005-04-24 01:29:02 UTC (rev 30129)
@@ -18,158 +18,47 @@
 import zope.interface
-class ISynchronizer(zope.interface.Interface):
-    """Objects that participate in the transaction-boundary notification API.
-    """
+class ITransactionManager(zope.interface.Interface):
+    """An object that manages a sequence of transactions
-    def beforeCompletion(transaction):
-        """Hook that is called by the transaction at the start of a commit."""
-    def afterCompletion(transaction):
-        """Hook that is called by the transaction after completing a commit."""
-class IDataManager(zope.interface.Interface):
-    """Objects that manage transactional storage.
-    These objects may manage data for other objects, or they may manage
-    non-object storages, such as relational databases.
-    IDataManagerOriginal is the interface currently provided by ZODB
-    database connections, but the intent is to move to the newer
-    IDataManager.
+    Applications use transaction managers to establish transaction boundaries.
-    def abort_sub(transaction):
-        """Discard all subtransaction data.
-        See subtransaction.txt
+    def begin():
+        """Begin a new transaction.
-        This is called when top-level transactions are aborted.
-        No further subtransactions can be started once abort_sub()
-        has been called; this is only used when the transaction is
-        being aborted.
-        abort_sub also implies the abort of a 2-phase commit.
-        This should never fail.
+        If an existing transaction is in progress, it will be aborted.
-    def commit_sub(transaction):
-        """Commit all changes made in subtransactions and begin 2-phase commit
-        Data are saved *as if* they are part of the current transaction.
-        That is, they will not be persistent unless the current transaction
-        is committed.
-        This is called when the current top-level transaction is committed.
-        No further subtransactions can be started once commit_sub()
-        has been called; this is only used when the transaction is
-        being committed.
-        This call also implies the beginning of 2-phase commit.
+    def get():
+        """Get the current transaction.
-    # Two-phase commit protocol.  These methods are called by the
-    # ITransaction object associated with the transaction being
-    # committed.
-    def tpc_begin(transaction, subtransaction=False):
-        """Begin commit of a transaction, starting the two-phase commit.
-        transaction is the ITransaction instance associated with the
-        transaction being committed.
-        subtransaction is a Boolean flag indicating whether the
-        two-phase commit is being invoked for a subtransaction.
-        Important note: Subtransactions are modelled in the sense that
-        when you commit a subtransaction, subsequent commits should be
-        for subtransactions as well.  That is, there must be a
-        commit_sub() call between a tpc_begin() call with the
-        subtransaction flag set to true and a tpc_begin() with the
-        flag set to false.
+    def commit():
+        """Commit the current transaction
-    def tpc_abort(transaction):
-        """Abort a transaction.
-        This is called by a transaction manager to end a two-phase commit on
-        the data manager.
-        This is always called after a tpc_begin call.
-        transaction is the ITransaction instance associated with the
-        transaction being committed.
-        This should never fail.
+    def abort(self):
+        """Abort the current transaction
-    def tpc_finish(transaction):
-        """Indicate confirmation that the transaction is done.
+    def registerSynch(synch):
+        """Register an ISynchronizer.
-        transaction is the ITransaction instance associated with the
-        transaction being committed.
-        This should never fail. If this raises an exception, the
-        database is not expected to maintain consistency; it's a
-        serious error.
-        It's important that the storage calls the passed function
-        while it still has its lock.  We don't want another thread
-        to be able to read any updated data until we've had a chance
-        to send an invalidation message to all of the other
-        connections!
+        Synchronizers are notified at the beginning and end of
+        transaction completion.
-    def tpc_vote(transaction):
-        """Verify that a data manager can commit the transaction
+    def unregisterSynch(synch):
+        """Unregister an ISynchronizer.
-        This is the last chance for a data manager to vote 'no'.  A
-        data manager votes 'no' by raising an exception.
-        transaction is the ITransaction instance associated with the
-        transaction being committed.
+        Synchronizers are notified at the beginning and end of
+        transaction completion.
-    def commit(transaction):
-        """Commit modifications to registered objects.
-        Save the object as part of the data to be made persistent if
-        the transaction commits.
-        This includes conflict detection and handling. If no conflicts or
-        errors occur it saves the objects in the storage.
-        """
-    def abort(transaction):
-        """Abort a transaction and forget all changes.
-        Abort must be called outside of a two-phase commit.
-        Abort is called by the transaction manager to abort transactions
-        that are not yet in a two-phase commit.
-        """
-    def sortKey():
-        """Return a key to use for ordering registered DataManagers
-        ZODB uses a global sort order to prevent deadlock when it commits
-        transactions involving multiple resource managers.  The resource
-        manager must define a sortKey() method that provides a global ordering
-        for resource managers.
-        """
-        # Alternate version:
-        #"""Return a consistent sort key for this connection.
-        #
-        #This allows ordering multiple connections that use the same storage in
-        #a consistent manner. This is unique for the lifetime of a connection,
-        #which is good enough to avoid ZEO deadlocks.
-        #"""
 class ITransaction(zope.interface.Interface):
     """Object representing a running transaction.
@@ -219,9 +108,17 @@
     def join(datamanager):
         """Add a datamanager to the transaction.
+        The if the data manager supports savepoints, it must call this
+        *before* making any changes.  If the transaction has had any
+        savepoints, then it will take a savepoint of the data manager
+        when join is called and this savepoint must reflct the state
+        of the data manager before any changes that caused the data
+        manager to join the transaction.
         The datamanager must implement the
         transactions.interfaces.IDataManager interface, and be
         adaptable to ZODB.interfaces.IDataManager.
     def note(text):
@@ -293,3 +190,161 @@
     # TODO: deprecate this for 3.6.
     def register(object):
         """Register the given object for transaction control."""
+class IDataManager(zope.interface.Interface):
+    """Objects that manage transactional storage.
+    These objects may manage data for other objects, or they may manage
+    non-object storages, such as relational databases.
+    IDataManagerOriginal is the interface currently provided by ZODB
+    database connections, but the intent is to move to the newer
+    IDataManager.
+    Note that when data are modified, data managers should join a
+    transaction so that data can be committed when the user commits
+    the transaction.
+    """
+    # Two-phase commit protocol.  These methods are called by the
+    # ITransaction object associated with the transaction being
+    # committed.
+    def abort(transaction):
+        """Abort a transaction and forget all changes.
+        Abort must be called outside of a two-phase commit.
+        Abort is called by the transaction manager to abort transactions
+        that are not yet in a two-phase commit.
+        """
+    def tpc_begin(transaction):
+        """Begin commit of a transaction, starting the two-phase commit.
+        transaction is the ITransaction instance associated with the
+        transaction being committed.
+        subtransaction is a Boolean flag indicating whether the
+        two-phase commit is being invoked for a subtransaction.
+        Important note: Subtransactions are modelled in the sense that
+        when you commit a subtransaction, subsequent commits should be
+        for subtransactions as well.  That is, there must be a
+        commit_sub() call between a tpc_begin() call with the
+        subtransaction flag set to true and a tpc_begin() with the
+        flag set to false.
+        """
+    def commit(transaction):
+        """Commit modifications to registered objects.
+        Save the object as part of the data to be made persistent if
+        the transaction commits.
+        This includes conflict detection and handling. If no conflicts or
+        errors occur it saves the objects in the storage.
+        """
+    def tpc_abort(transaction):
+        """Abort a transaction.
+        This is called by a transaction manager to end a two-phase commit on
+        the data manager.
+        This is always called after a tpc_begin call.
+        transaction is the ITransaction instance associated with the
+        transaction being committed.
+        This should never fail.
+        """
+    def tpc_vote(transaction):
+        """Verify that a data manager can commit the transaction
+        This is the last chance for a data manager to vote 'no'.  A
+        data manager votes 'no' by raising an exception.
+        transaction is the ITransaction instance associated with the
+        transaction being committed.
+        """
+    def tpc_finish(transaction):
+        """Indicate confirmation that the transaction is done.
+        transaction is the ITransaction instance associated with the
+        transaction being committed.
+        This should never fail. If this raises an exception, the
+        database is not expected to maintain consistency; it's a
+        serious error.
+        It's important that the storage calls the passed function
+        while it still has its lock.  We don't want another thread
+        to be able to read any updated data until we've had a chance
+        to send an invalidation message to all of the other
+        connections!
+        """
+    def sortKey():
+        """Return a key to use for ordering registered DataManagers
+        ZODB uses a global sort order to prevent deadlock when it commits
+        transactions involving multiple resource managers.  The resource
+        manager must define a sortKey() method that provides a global ordering
+        for resource managers.
+        """
+        # Alternate version:
+        #"""Return a consistent sort key for this connection.
+        #
+        #This allows ordering multiple connections that use the same storage in
+        #a consistent manner. This is unique for the lifetime of a connection,
+        #which is good enough to avoid ZEO deadlocks.
+        #"""
+class ISavepointDataManager(IDataManager):
+    def savepoint():
+        """Return a savepoint (ISavepoint)
+        """
+class ISavepoint(zope.interface.Interface):
+    """A transaction savepoint
+    """
+    def rollback():
+        """Rollback any work done since the savepoint
+        An InvalidSavepointRollbackError is raised if the savepoint
+        isn't valid.
+        """
+    valid = zope.interface.Attribute(
+        "Boolean indicating whether the savepoint is valid")
+class InvalidSavepointRollbackError(Exception):
+    """Attempt to rollback an invalid savepoint
+    A savepoint may be invalid because:
+    - The surrounding transaction has committed or aborted
+    - An earlier savepoint in the same transaction has been rolled back
+    """
+class ISynchronizer(zope.interface.Interface):
+    """Objects that participate in the transaction-boundary notification API.
+    """
+    def beforeCompletion(transaction):
+        """Hook that is called by the transaction at the start of a commit.
+        """
+    def afterCompletion(transaction):
+        """Hook that is called by the transaction after completing a commit.
+        """

Added: ZODB/branches/3.4/src/transaction/savepoint.txt
--- ZODB/branches/3.4/src/transaction/savepoint.txt	2005-04-24 01:28:59 UTC (rev 30128)
+++ ZODB/branches/3.4/src/transaction/savepoint.txt	2005-04-24 01:29:02 UTC (rev 30129)
@@ -0,0 +1,221 @@
+Savepoints provide a way to save to disk intermediate work done during
+a transaction allowing:
+- partial transaction (subtransaction) rollback (abort)
+- state of saved objects to be freed, freeing on-line memory for other
+  uses
+Savepoints make it possible to write atomic subroutines that don't
+make top-level transaction commitments.
+To demonstrate how savepoints work with transactions, we've provided a
+sample data manager implementation that provides savepoint support.
+The primary purpose of this data manager is to provide code that can
+be read to understand how savepoints work. The secondary purpose is to
+provide support for demonstrating the correct operation of savepoint
+support within the transaction system.  This data manager is very
+simple.  It provides flat storage of named immutable values, like strings
+and numbers.
+    >>> import transaction.tests.savepointsample
+    >>> dm = transaction.tests.savepointsample.SampleSavepointDataManager()
+    >>> dm['name'] = 'bob'
+As with other data managers, we can commit changes:
+    >>> transaction.commit()
+    >>> dm['name']
+    'bob'
+and abort changes:
+    >>> dm['name'] = 'sally'
+    >>> dm['name']
+    'sally'
+    >>> transaction.abort()
+    >>> dm['name']
+    'bob'
+Now, lets look at an application that manages funds for people.
+It allows deposits and debits to be entered for multiple people.
+It accepts a sequence of entries and generates a sequence of status
+messages.  For each entry, it applies the change and then validates
+the user's account.  If the user's account is invalid, we role back
+the change for that entry.  The success or failure of an entry is 
+indicated in the output status. First we'll initialize some accounts:
+    >>> dm['bob-balance'] = 0.0
+    >>> dm['bob-credit'] = 0.0
+    >>> dm['sally-balance'] = 0.0
+    >>> dm['sally-credit'] = 100.0
+    >>> transaction.commit()
+Now, we'll define a validation function to validate an account:
+    >>> def validate_account(name):
+    ...     if dm[name+'-balance'] + dm[name+'-credit'] < 0:
+    ...         raise ValueError('Overdrawn', name)
+And a function to apply entries.  If the function fails in some
+unexpected way, it rolls back all of it's changes and 
+prints the error:
+    >>> def apply_entries(entries):
+    ...     savepoint = transaction.savepoint()
+    ...     try:
+    ...         for name, amount in entries:
+    ...             entry_savepoint = transaction.savepoint()
+    ...             try:
+    ...                 dm[name+'-balance'] += amount
+    ...                 validate_account(name)
+    ...             except ValueError, error:
+    ...                 entry_savepoint.rollback()
+    ...                 print 'Error', str(error)
+    ...             else:
+    ...                 print 'Updated', name
+    ...     except Exception, error:
+    ...         savepoint.rollback()
+    ...         print 'Unexpected exception', error
+Now let's try applying some entries:
+    >>> apply_entries([
+    ...     ('bob',   10.0),
+    ...     ('sally', 10.0),
+    ...     ('bob',   20.0),
+    ...     ('sally', 10.0),
+    ...     ('bob',   -100.0),
+    ...     ('sally', -100.0),
+    ...     ])
+    Updated bob
+    Updated sally
+    Updated bob
+    Updated sally
+    Error ('Overdrawn', 'bob')
+    Updated sally
+    >>> dm['bob-balance']
+    30.0
+    >>> dm['sally-balance']
+    -80.0
+If we give provide entries that cause an unexpected error:
+    >>> apply_entries([
+    ...     ('bob',   10.0),
+    ...     ('sally', 10.0),
+    ...     ('bob',   '20.0'),
+    ...     ('sally', 10.0),
+    ...     ])
+    Updated bob
+    Updated sally
+    Unexpected exception unsupported operand type(s) for +=: 'float' and 'str'
+Because the apply_entries used a savepoint for the entire function, 
+it was able to rollback the partial changes without rolling back
+changes made in the previous call to apply_entries:
+    >>> dm['bob-balance']
+    30.0
+    >>> dm['sally-balance']
+    -80.0
+If we now abort the outer transactions, the earlier changes will go
+    >>> transaction.abort()
+    >>> dm['bob-balance']
+    0.0
+    >>> dm['sally-balance']
+    0.0
+Savepoint invalidation
+Once a savepoint has been used, it can't be used again:
+    >>> savepoint = transaction.savepoint()
+    >>> dm['bob-balance'] = 100.0
+    >>> dm['bob-balance']
+    100.0
+    >>> savepoint.rollback()
+    >>> dm['bob-balance']
+    0.0
+    >>> savepoint.rollback()
+    Traceback (most recent call last):
+    ...
+    InvalidSavepointRollbackError
+Using a savepoint also invalidates any savepoints that com eafter it:
+    >>> savepoint1 = transaction.savepoint()
+    >>> dm['bob-balance'] = 100.0
+    >>> dm['bob-balance']
+    100.0
+    >>> savepoint2 = transaction.savepoint()
+    >>> dm['bob-balance'] = 200.0
+    >>> dm['bob-balance']
+    200.0
+    >>> savepoint1.rollback()
+    >>> dm['bob-balance']
+    0.0
+    >>> savepoint2.rollback()
+    Traceback (most recent call last):
+    ...
+    InvalidSavepointRollbackError
+    >>> transaction.abort()
+Databases without savepoint support
+Normally it's an error to use savepoints with databases that don't
+support savepoints:
+    >>> dm_no_sp = transaction.tests.savepointsample.SampleDataManager()
+    >>> dm_no_sp['name'] = 'bob'
+    >>> transaction.commit()
+    >>> dm_no_sp['name'] = 'sally'
+    >>> savepoint = transaction.savepoint()
+    Traceback (most recent call last):
+    ...
+    TypeError: ('Savepoints unsupported', {'name': 'bob'})
+    >>> transaction.abort()
+However, a flag can be passed to the transaction savepoint method to
+indicate that databases without savepoint support should be tolderated
+until a savepoint is roled back.  This allows transactions to proceed
+is there are no reasons to roll back:
+    >>> dm_no_sp['name'] = 'sally'
+    >>> savepoint = transaction.savepoint(1)
+    >>> dm_no_sp['name'] = 'sue'
+    >>> transaction.commit()
+    >>> dm_no_sp['name']
+    'sue'
+    >>> savepoint = transaction.savepoint(1)
+    >>> dm_no_sp['name'] = 'sam'
+    >>> savepoint.rollback()
+    Traceback (most recent call last):
+    ...
+    TypeError: ('Savepoints unsupported', {'name': 'sam'})

Property changes on: ZODB/branches/3.4/src/transaction/savepoint.txt
Name: svn:eol-style
   + native

Added: ZODB/branches/3.4/src/transaction/tests/savepointsample.py
--- ZODB/branches/3.4/src/transaction/tests/savepointsample.py	2005-04-24 01:28:59 UTC (rev 30128)
+++ ZODB/branches/3.4/src/transaction/tests/savepointsample.py	2005-04-24 01:29:02 UTC (rev 30129)
@@ -0,0 +1,174 @@
+# Copyright (c) 2004 Zope Corporation and Contributors.
+# All Rights Reserved.
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+"""Savepoint data manager implementation example
+Sample data manager implementation that illustrates how to implement savepoints
+See savepoint.txt in the transaction package.
+import UserDict
+from zope import interface
+import transaction.interfaces
+class SampleDataManager(UserDict.DictMixin):
+    """Sample implementation of data manager that doesn't support savepoints
+    This data manager stores named simple values, like strings and numbers.
+    """
+    interface.implements(transaction.interfaces.ISavepointDataManager)
+    def __init__(self, transaction_manager = None):
+        if transaction_manager is None:
+            # Use the thread-local transaction manager if none is provided:
+            transaction_manager = transaction.manager
+        self.transaction_manager = transaction_manager
+        # Our committed and uncommitted data:
+        self.committed = {}
+        self.uncommitted = self.committed.copy()
+        # Our transaction state:
+        #   If our uncommitted data is modified, we'll join a transaction
+        #   and keep track of the transaction we joined.  Any commit
+        #   related messages we get should be for this same transaction
+        self.transaction = None
+        #   What phase, if any, of two-phase commit we are in:
+        self.tpc_phase = None
+    #######################################################################
+    # Provide a mapping interface to uncommitted data.  We provide
+    # a basic subset of the interface. DictMixin does the rest.
+    def __getitem__(self, name):
+        return self.uncommitted[name]
+    def __setitem__(self, name, value):
+        self._join() # join the current transaction, if we haven't already
+        self.uncommitted[name] = value
+    def __delitem__(self, name):
+        self._join() # join the current transaction, if we haven't already
+        del self.uncommitted[name]
+    def keys(self):
+        return self.uncommitted.keys()
+    #
+    #######################################################################
+    #######################################################################
+    # Transaction methods
+    def _join(self):
+        # If this is the first change in the transaction, join the transaction
+        if self.transaction is None:
+            self.transaction = self.transaction_manager.get()
+            self.transaction.join(self)
+    def _resetTransaction(self):
+        self.transaction = None
+        self.tpc_phase = None
+    def abort(self, transaction):
+        """Throw away changes made before the commit process has started
+        """
+        assert ((transaction is self.transaction) or (self.transaction is None)
+                ), "Must not change transactions"
+        assert self.tpc_phase is None, "Must be called outside of tpc"
+        self.uncommitted = self.committed.copy()
+        self._resetTransaction()
+    def tpc_begin(self, transaction):
+        """Enter two-phase commit
+        """
+        assert transaction is self.transaction, "Must not change transactions"
+        assert self.tpc_phase is None, "Must be called outside of tpc"
+        self.tpc_phase = 1
+    def commit(self, transaction):
+        """Record data modified during the transaction
+        """
+        assert transaction is self.transaction, "Must not change transactions"
+        assert self.tpc_phase == 1, "Must be called in first phase of tpc"
+        # In our simple example, we don't need to do anything.
+        # A more complex data manager would typically write to some sort
+        # of log.
+    def tpc_vote(self, transaction):
+        assert transaction is self.transaction, "Must not change transactions"
+        assert self.tpc_phase == 1, "Must be called in first phase of tpc"
+        # This particular data manager is always ready to vote.
+        # Real data managers will usually need to take some steps to
+        # make sure that the finish will succeed
+        self.tpc_phase = 2
+    def tpc_finish(self, transaction):
+        assert transaction is self.transaction, "Must not change transactions"
+        assert self.tpc_phase == 2, "Must be called in second phase of tpc"
+        self.committed = self.uncommitted.copy()
+        self._resetTransaction()
+    def tpc_abort(self, transaction):
+        assert transaction is self.transaction, "Must not change transactions"
+        assert self.tpc_phase is not None, "Must be called inside of tpc"
+        self.uncommitted = self.committed.copy()
+        self._resetTransaction()
+    #
+    #######################################################################
+    #######################################################################
+    # Other data manager methods
+    def sortKey(self):
+        # Commit operations on multiple data managers are performed in
+        # sort key order.  This important to avoid deadlock when data
+        # managers are shared among multiple threads or processes and
+        # use locks to manage that sharing.  We aren't going to bother
+        # with that here.
+        return str(id(self))
+    #
+    #######################################################################
+class SampleSavepointDataManager(SampleDataManager):
+    """Sample implementation of a savepoint-supporting data manager
+    This extends the basic data manager with savepoint support.
+    """
+    def savepoint(self):
+        # When we create the savepoint, we save the existing database state
+        return SampleSavepoint(self, self.uncommitted.copy())
+    def _rollback_savepoint(self, savepoint):
+        # when we rollback the savepoint, we restore the saved data
+        self.uncommitted = savepoint.data
+class SampleSavepoint:
+    def __init__(self, data_manager, data):
+        self.data_manager = data_manager
+        self.data = data
+    def rollback(self):
+        self.data_manager._rollback_savepoint(self)

Property changes on: ZODB/branches/3.4/src/transaction/tests/savepointsample.py
Name: svn:keywords
   + Id
Name: svn:eol-style
   + native

Modified: ZODB/branches/3.4/src/transaction/tests/test_register_compat.py
--- ZODB/branches/3.4/src/transaction/tests/test_register_compat.py	2005-04-24 01:28:59 UTC (rev 30128)
+++ ZODB/branches/3.4/src/transaction/tests/test_register_compat.py	2005-04-24 01:29:02 UTC (rev 30129)
@@ -128,7 +128,7 @@
     def sortKey(self):
         return str(id(self))
-    def tpc_begin(self, txn, sub):
+    def tpc_begin(self, txn):
     def tpc_vote(self, txn):

Added: ZODB/branches/3.4/src/transaction/tests/test_savepoint.py
--- ZODB/branches/3.4/src/transaction/tests/test_savepoint.py	2005-04-24 01:28:59 UTC (rev 30128)
+++ ZODB/branches/3.4/src/transaction/tests/test_savepoint.py	2005-04-24 01:29:02 UTC (rev 30129)
@@ -0,0 +1,29 @@
+# Copyright (c) 2004 Zope Corporation and Contributors.
+# All Rights Reserved.
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+"""Tests of savepoint feature
+import unittest
+from zope.testing import doctest
+def test_suite():
+    return unittest.TestSuite((
+        doctest.DocFileSuite('../savepoint.txt'),
+        ))
+if __name__ == '__main__':
+    unittest.main(defaultTest='test_suite')

Property changes on: ZODB/branches/3.4/src/transaction/tests/test_savepoint.py
Name: svn:keywords
   + Id
Name: svn:eol-style
   + native

More information about the Zodb-checkins mailing list