[Zope-Checkins] CVS: Zope3/src/ZODB - Transaction.py:1.59 TmpStore.py:1.12 MappingStorage.py:1.13 DB.py:1.72 Connection.py:1.141

Jeremy Hylton jeremy at zope.com
Wed Mar 31 22:57:29 EST 2004


Update of /cvs-repository/Zope3/src/ZODB
In directory cvs.zope.org:/tmp/cvs-serv23866/src/ZODB

Modified Files:
	Transaction.py TmpStore.py MappingStorage.py DB.py 
	Connection.py 
Log Message:
Merge the jeremy-txn-branch to the head.

This branch introduces a new transaction API.  The key features are:
  - top-level functions in transaction -- get(), commit(), abort() 
  - explicit transaction manager objects
  - Transaction objects are used for exactly one transaction
  - support for transaction synchronizers

The changes here are still provisional, but we want to get them off an
obscure branch and onto the head for further development.


=== Zope3/src/ZODB/Transaction.py 1.58 => 1.59 ===
--- Zope3/src/ZODB/Transaction.py:1.58	Thu Feb 26 19:31:53 2004
+++ Zope3/src/ZODB/Transaction.py	Wed Mar 31 22:56:58 2004
@@ -64,6 +64,7 @@
         self._id=id
         self._objects=[]
         self._append=self._objects.append
+        raise RuntimeError
 
     def _init(self):
         self._objects=[]
@@ -532,25 +533,27 @@
 ############################################################################
 # install get_transaction:
 
-# Map thread ident to its Transaction instance.
-_tid2tran = {}
+### Map thread ident to its Transaction instance.
+##_tid2tran = {}
 
-# Get Transaction associated with current thread; if none, create a
-# new Transaction and return it.
-def get_transaction():
-    tid = _get_ident()
-    result = _tid2tran.get(tid)
-    if result is None:
-        _tid2tran[tid] = result = Transaction(tid)
-    return result
-
-# Forget whatever Transaction (if any) is associated with current thread.
-def free_transaction():
-    tid = _get_ident()
-    try:
-        del _tid2tran[tid]
-    except KeyError:
-        pass
+### Get Transaction associated with current thread; if none, create a
+### new Transaction and return it.
+##def get_transaction():
+##    tid = _get_ident()
+##    result = _tid2tran.get(tid)
+##    if result is None:
+##        _tid2tran[tid] = result = Transaction(tid)
+##    return result
+
+### Forget whatever Transaction (if any) is associated with current thread.
+##def free_transaction():
+##    tid = _get_ident()
+##    try:
+##        del _tid2tran[tid]
+##    except KeyError:
+##        pass
+
+from transaction import get as get_transaction
 
 import __builtin__
 __builtin__.get_transaction = get_transaction


=== Zope3/src/ZODB/TmpStore.py 1.11 => 1.12 ===
--- Zope3/src/ZODB/TmpStore.py:1.11	Thu Oct  2 14:17:19 2003
+++ Zope3/src/ZODB/TmpStore.py	Wed Mar 31 22:56:58 2004
@@ -22,8 +22,9 @@
 
     _bver = ''
 
-    def __init__(self, base_version):
+    def __init__(self, base_version, storage):
         self._transaction = None
+        self._storage = storage
         if base_version:
             self._bver = base_version
         self._file = tempfile.TemporaryFile()
@@ -34,14 +35,13 @@
         self._index = {}
         # _tindex: map oid to pos for new updates
         self._tindex = {}
-        self._db = None
         self._creating = []
 
     def close(self):
         self._file.close()
 
     def getName(self):
-        return self._db.getName()
+        return self._storage.getName()
 
     def getSize(self):
         return self._pos
@@ -66,14 +66,13 @@
     def modifiedInVersion(self, oid):
         if self._index.has_key(oid):
             return self._bver
-        return self._db._storage.modifiedInVersion(oid)
+        return self._storage.modifiedInVersion(oid)
 
     def new_oid(self):
-        return self._db._storage.new_oid()
+        return self._storage.new_oid()
 
     def registerDB(self, db, limit):
-        self._db = db
-        self._storage = db._storage
+        pass
 
     def store(self, oid, serial, data, version, transaction):
         if transaction is not self._transaction:


=== Zope3/src/ZODB/MappingStorage.py 1.12 => 1.13 ===
--- Zope3/src/ZODB/MappingStorage.py:1.12	Thu Mar 11 15:10:55 2004
+++ Zope3/src/ZODB/MappingStorage.py	Wed Mar 31 22:56:58 2004
@@ -68,6 +68,16 @@
         finally:
             self._lock_release()
 
+    def getTid(self, oid):
+        self._lock_acquire()
+        try:
+            # The tid is the first 8 bytes of the buffer.
+            s = self._index[oid]
+            return s[:8]
+        finally:
+            self._lock_release()
+        
+
     def store(self, oid, serial, data, version, transaction):
         if transaction is not self._transaction:
             raise POSException.StorageTransactionError(self, transaction)


=== Zope3/src/ZODB/DB.py 1.71 => 1.72 ===
--- Zope3/src/ZODB/DB.py:1.71	Tue Mar 16 11:28:19 2004
+++ Zope3/src/ZODB/DB.py	Wed Mar 31 22:56:58 2004
@@ -23,9 +23,10 @@
 from ZODB.broken import find_global
 from ZODB.Connection import Connection
 from ZODB.serialize import referencesf
-from ZODB.Transaction import Transaction, get_transaction
 from zLOG import LOG, ERROR
 
+import transaction
+
 class DB(object):
     """The Object Database
     -------------------
@@ -132,7 +133,7 @@
             p = cPickle.Pickler(file, 1)
             p.dump((root.__class__, None))
             p.dump(root.__getstate__())
-            t = Transaction()
+            t = transaction.Transaction()
             t.description = 'initial database creation'
             storage.tpc_begin(t)
             storage.store('\0\0\0\0\0\0\0\0', None, file.getvalue(), '', t)
@@ -140,13 +141,12 @@
             storage.tpc_finish(t)
 
         # Pass through methods:
-        for m in ('history',
-                  'supportsUndo', 'supportsVersions', 'undoLog',
-                  'versionEmpty', 'versions'):
+        for m in ['history', 'supportsUndo', 'supportsVersions', 'undoLog',
+                  'versionEmpty', 'versions']:
             setattr(self, m, getattr(storage, m))
 
         if hasattr(storage, 'undoInfo'):
-            self.undoInfo=storage.undoInfo
+            self.undoInfo = storage.undoInfo
 
 
     def _cacheMean(self, attr):
@@ -206,10 +206,10 @@
                 self._temps=t
         finally: self._r()
 
-    def abortVersion(self, version, transaction=None):
-        if transaction is None:
-            transaction = get_transaction()
-        transaction.register(AbortVersion(self, version))
+    def abortVersion(self, version, txn=None):
+        if txn is None:
+            txn = transaction.get()
+        txn.register(AbortVersion(self, version))
 
     def cacheDetail(self):
         """Return information on objects in the various caches
@@ -316,10 +316,10 @@
         """
         self._storage.close()
 
-    def commitVersion(self, source, destination='', transaction=None):
-        if transaction is None:
-            transaction = get_transaction()
-        transaction.register(CommitVersion(self, source, destination))
+    def commitVersion(self, source, destination='', txn=None):
+        if txn is None:
+            txn = transaction.get()
+        txn.register(CommitVersion(self, source, destination))
 
     def getCacheSize(self):
         return self._cache_size
@@ -391,7 +391,7 @@
         return len(self._storage)
 
     def open(self, version='', transaction=None, temporary=0, force=None,
-             waitflag=1, mvcc=True):
+             waitflag=1, mvcc=True, txn_mgr=None):
         """Return a database Connection for use by application code.
 
         The optional version argument can be used to specify that a
@@ -424,7 +424,7 @@
                 # a one-use connection.
                 c = self.klass(version=version,
                                cache_size=self._version_cache_size,
-                               mvcc=mvcc)
+                               mvcc=mvcc, txn_mgr=txn_mgr)
                 c._setDB(self)
                 self._temps.append(c)
                 if transaction is not None:
@@ -474,13 +474,13 @@
                     if self._version_pool_size > len(allocated) or force:
                         c = self.klass(version=version,
                                        cache_size=self._version_cache_size,
-                                       mvcc=mvcc)
+                                       mvcc=mvcc, txn_mgr=txn_mgr)
                         allocated.append(c)
                         pool.append(c)
                 elif self._pool_size > len(allocated) or force:
                     c = self.klass(version=version,
                                    cache_size=self._cache_size,
-                                   mvcc=mvcc)
+                                   mvcc=mvcc, txn_mgr=txn_mgr)
                     allocated.append(c)
                     pool.append(c)
 
@@ -611,7 +611,7 @@
 
     def cacheStatistics(self): return () # :(
 
-    def undo(self, id, transaction=None):
+    def undo(self, id, txn=None):
         """Undo a transaction identified by id.
 
         A transaction can be undone if all of the objects involved in
@@ -625,12 +625,12 @@
 
         :Parameters:
           - `id`: a storage-specific transaction identifier
-          - `transaction`: transaction context to use for undo().
+          - `txn`: transaction context to use for undo().
             By default, uses the current transaction.
         """
-        if transaction is None:
-            transaction = get_transaction()
-        transaction.register(TransactionalUndo(self, id))
+        if txn is None:
+            txn = transaction.get()
+        txn.register(TransactionalUndo(self, id))
 
     def versionEmpty(self, version):
         return self._storage.versionEmpty(version)
@@ -663,7 +663,6 @@
     def __init__(self, db):
         self._db = db
         # Delegate the actual 2PC methods to the storage
-        self.tpc_begin = self._db._storage.tpc_begin
         self.tpc_vote = self._db._storage.tpc_vote
         self.tpc_finish = self._db._storage.tpc_finish
         self.tpc_abort = self._db._storage.tpc_abort
@@ -671,13 +670,19 @@
     def sortKey(self):
         return "%s:%s" % (self._db._storage.sortKey(), id(self))
 
+    def tpc_begin(self, txn, sub=False):
+        # XXX we should never be called with sub=True.
+        if sub:
+            raise ValueError, "doesn't supoprt sub-transactions"
+        self._db._storage.tpc_begin(txn)
+
     # The object registers itself with the txn manager, so the ob
     # argument to the methods below is self.
 
-    def abort(self, ob, t):
+    def abort(self, obj, txn):
         pass
 
-    def commit(self, ob, t):
+    def commit(self, obj, txn):
         pass
 
 class CommitVersion(ResourceManager):


=== Zope3/src/ZODB/Connection.py 1.140 => 1.141 ===
--- Zope3/src/ZODB/Connection.py:1.140	Tue Mar 16 11:18:20 2004
+++ Zope3/src/ZODB/Connection.py	Wed Mar 31 22:56:58 2004
@@ -26,12 +26,13 @@
 from persistent import PickleCache
 from persistent.interfaces import IPersistent
 
+import transaction
+
 from ZODB.ConflictResolution import ResolvedSerial
 from ZODB.ExportImport import ExportImport
 from ZODB.POSException \
      import ConflictError, ReadConflictError, InvalidObjectReference
 from ZODB.TmpStore import TmpStore
-from ZODB.Transaction import Transaction, get_transaction
 from ZODB.utils import oid_repr, z64
 from ZODB.serialize import ObjectWriter, ConnectionObjectReader, myhasattr
 
@@ -79,7 +80,7 @@
 
     In many applications, root() is the only method of the Connection
     that you will need to use.
-    
+
     Synchronization
     ---------------
 
@@ -104,7 +105,7 @@
     XXX Mention the database pool.
 
     :Groups:
-    
+
       - `User Methods`: root, get, add, close, db, sync, isReadOnly,
         cacheGC, cacheFullSweep, cacheMinimize, getVersion,
         modifiedInVersion
@@ -123,10 +124,9 @@
 
     _tmp = None
     _code_timestamp = 0
-    _transaction = None
 
     def __init__(self, version='', cache_size=400,
-                 cache_deactivate_after=None, mvcc=True):
+                 cache_deactivate_after=None, mvcc=True, txn_mgr=None):
         """Create a new Connection.
 
         A Connection instance should by instantiated by the DB
@@ -153,6 +153,13 @@
         self._reset_counter = global_reset_counter
         self._load_count = 0   # Number of objects unghosted
         self._store_count = 0  # Number of objects stored
+        self._modified = []
+
+        # If a transaction manager is passed to the constructor, use
+        # it instead of the global transaction manager.  The instance
+        # variable will hold either a TM class or the transaction
+        # module itself, which implements the same interface.
+        self._txn_mgr = txn_mgr or transaction
 
         # _invalidated queues invalidate messages delivered from the DB
         # _inv_lock prevents one thread from modifying the set while
@@ -187,21 +194,18 @@
         self._import = None
 
     def getTransaction(self):
-        t = self._transaction
-        if t is None:
-            # Fall back to thread-bound transactions
-            t = get_transaction()
-        return t
+        # XXX mark this as deprecated?
+        return self._txn_mgr.get()
 
     def setLocalTransaction(self):
         """Use a transaction bound to the connection rather than the thread"""
 
-        # XXX mention that this should only be called when you open
-        # a connection or at transaction boundaries (but the lattter are
-        # hard to be sure about).
-        if self._transaction is None:
-            self._transaction = Transaction()
-        return self._transaction
+        # XXX mark this method as depcrecated?  note that it's
+        # signature changed?
+
+        if self._txn_mgr is transaction:
+            self._txn_mgr = transaction.TransactionManager()
+        return self._txn_mgr
 
     def _cache_items(self):
         # find all items on the lru list
@@ -248,7 +252,7 @@
         if self._storage is None:
             # XXX Should this be a ZODB-specific exception?
             raise RuntimeError("The database connection is closed")
-        
+
         obj = self._cache.get(oid, None)
         if obj is not None:
             return obj
@@ -297,7 +301,7 @@
         if self._storage is None:
             # XXX Should this be a ZODB-specific exception?
             raise RuntimeError("The database connection is closed")
-        
+
         marker = object()
         oid = getattr(obj, "_p_oid", marker)
         if oid is marker:
@@ -307,10 +311,13 @@
             assert obj._p_oid is None
             oid = obj._p_oid = self._storage.new_oid()
             obj._p_jar = self
-            self._added[oid] = obj
             if self._added_during_commit is not None:
                 self._added_during_commit.append(obj)
-            self.getTransaction().register(obj)
+            self._txn_mgr.get().register(obj)
+            # Add to _added after calling register(), so that _added
+            # can be used as a test for whether the object has been
+            # registered with the transaction.
+            self._added[oid] = obj
         elif obj._p_jar is not self:
             raise InvalidObjectReference(obj, obj._p_jar)
 
@@ -328,7 +335,7 @@
         was closed will be processed.
 
         If resetCaches() was called, the cache will be cleared.
-        
+
         :Parameters:
           - `odb`: that database that owns the Connection
         """
@@ -337,7 +344,7 @@
         # other attributes on open and clearing them on close?
         # A Connection is only ever associated with a single DB
         # and Storage.
-        
+
         self._db = odb
         self._storage = odb._storage
         self._sortKey = odb._storage.sortKey
@@ -409,7 +416,7 @@
 
     def cacheGC(self):
         """Reduce cache size to target size.
-        
+
         Call _p_deactivate() on cached objects until the cache size
         falls under the target size.
         """
@@ -453,7 +460,7 @@
                 except: # except what?
                     f = getattr(f, 'im_self', f)
                     self._log.error("Close callback failed for %s", f,
-                                    sys.exc_info())
+                                    exc_info=sys.exc_info())
             self.__onCloseCallbacks = None
         self._storage = self._tmp = self.new_oid = None
         self._debug_info = ()
@@ -472,7 +479,6 @@
 
         oid = obj._p_oid
         if oid in self._conflicts:
-            self.getTransaction().register(obj)
             raise ReadConflictError(object=obj)
 
         if oid is None or obj._p_jar is not self:
@@ -537,7 +543,7 @@
         src = self._storage
         self._storage = self._tmp
         self._tmp = None
-        
+
         self._log.debug("Commiting subtransaction of size %s", src.getSize())
         oids = src._index.keys()
         self._storage.tpc_begin(t)
@@ -603,7 +609,7 @@
         :Parameters:
           - `tid`: the storage-level id of the transaction that committed
           - `oids`: oids is a set of oids, represented as a dict with oids
-            as keys. 
+            as keys.
         """
         self._inv_lock.acquire()
         try:
@@ -643,14 +649,17 @@
             # There is some old Zope code that assigns _p_jar
             # directly.  That is no longer allowed, but we need to
             # provide support for old code that still does it.
-            
+
             # XXX The actual complaint here is that an object without
             # an oid is being registered.  I can't think of any way to
             # achieve that without assignment to _p_jar.  If there is
             # a way, this will be a very confusing warning.
             warnings.warn("Assigning to _p_jar is deprecated",
                           DeprecationWarning)
-        self.getTransaction().register(obj)
+        elif obj._p_oid in self._added:
+            # It was registered before it was added to _added.
+            return
+        self._txn_mgr.get().register(obj)
 
     def root(self):
         """Return the database root object.
@@ -729,7 +738,7 @@
         """Load non-current state for obj or raise ReadConflictError."""
 
         if not (self._mvcc and self._setstate_noncurrent(obj)):
-            self.getTransaction().register(obj)
+            self._txn_mgr.get().register(obj)
             self._conflicts[obj._p_oid] = True
             raise ReadConflictError(object=obj)
 
@@ -771,7 +780,7 @@
             finally:
                 self._inv_lock.release()
         else:
-            self.getTransaction().register(obj)
+            self._txn_mgr.get().register(obj)
             raise ReadConflictError(object=obj)
 
     def oldstate(self, obj, tid):
@@ -782,7 +791,7 @@
         to other peristent objects are handled.
 
         :return: a persistent object
-        
+
         :Parameters:
           - `obj`: a persistent object from this Connection.
           - `tid`: id of a transaction that wrote an earlier revision.
@@ -829,19 +838,16 @@
             del obj._p_oid
             del obj._p_jar
 
-    def tpc_begin(self, transaction, sub=None):
+    def tpc_begin(self, transaction, sub=False):
         self._modified = []
-        
+
         # _creating is a list of oids of new objects, which is used to
         # remove them from the cache if a transaction aborts.
         self._creating = []
-        if sub:
+        if sub and self._tmp is None:
             # Sub-transaction!
-            if self._tmp is None:
-                _tmp = TmpStore(self._version)
-                self._tmp = self._storage
-                self._storage = _tmp
-                _tmp.registerDB(self._db, 0)
+            self._tmp = self._storage
+            self._storage = TmpStore(self._version, self._storage)
 
         self._storage.tpc_begin(transaction)
 
@@ -920,7 +926,7 @@
         self._flush_invalidations()
 
     def sync(self):
-        self.getTransaction().abort()
+        self._txn_mgr.get().abort()
         sync = getattr(self._storage, 'sync', 0)
         if sync:
             sync()
@@ -949,5 +955,5 @@
         new._p_oid = oid
         new._p_jar = self
         new._p_changed = 1
-        self.getTransaction().register(new)
+        self._txn_mgr.get().register(new)
         self._cache[oid] = new




More information about the Zope-Checkins mailing list