[Zope-Checkins] CVS: Zope3/src/ZODB - Transaction.py:1.59
TmpStore.py:1.12 MappingStorage.py:1.13 DB.py:1.72
Connection.py:1.141
Jeremy Hylton
jeremy at zope.com
Wed Mar 31 22:57:29 EST 2004
Update of /cvs-repository/Zope3/src/ZODB
In directory cvs.zope.org:/tmp/cvs-serv23866/src/ZODB
Modified Files:
Transaction.py TmpStore.py MappingStorage.py DB.py
Connection.py
Log Message:
Merge the jeremy-txn-branch to the head.
This branch introduces a new transaction API. The key features are:
- top-level functions in transaction -- get(), commit(), abort()
- explicit transaction manager objects
- Transaction objects are used for exactly one transaction
- support for transaction synchronizers
The changes here are still provisional, but we want to get them off an
obscure branch and onto the head for further development.
=== Zope3/src/ZODB/Transaction.py 1.58 => 1.59 ===
--- Zope3/src/ZODB/Transaction.py:1.58 Thu Feb 26 19:31:53 2004
+++ Zope3/src/ZODB/Transaction.py Wed Mar 31 22:56:58 2004
@@ -64,6 +64,7 @@
self._id=id
self._objects=[]
self._append=self._objects.append
+ raise RuntimeError
def _init(self):
self._objects=[]
@@ -532,25 +533,27 @@
############################################################################
# install get_transaction:
-# Map thread ident to its Transaction instance.
-_tid2tran = {}
+### Map thread ident to its Transaction instance.
+##_tid2tran = {}
-# Get Transaction associated with current thread; if none, create a
-# new Transaction and return it.
-def get_transaction():
- tid = _get_ident()
- result = _tid2tran.get(tid)
- if result is None:
- _tid2tran[tid] = result = Transaction(tid)
- return result
-
-# Forget whatever Transaction (if any) is associated with current thread.
-def free_transaction():
- tid = _get_ident()
- try:
- del _tid2tran[tid]
- except KeyError:
- pass
+### Get Transaction associated with current thread; if none, create a
+### new Transaction and return it.
+##def get_transaction():
+## tid = _get_ident()
+## result = _tid2tran.get(tid)
+## if result is None:
+## _tid2tran[tid] = result = Transaction(tid)
+## return result
+
+### Forget whatever Transaction (if any) is associated with current thread.
+##def free_transaction():
+## tid = _get_ident()
+## try:
+## del _tid2tran[tid]
+## except KeyError:
+## pass
+
+from transaction import get as get_transaction
import __builtin__
__builtin__.get_transaction = get_transaction
=== Zope3/src/ZODB/TmpStore.py 1.11 => 1.12 ===
--- Zope3/src/ZODB/TmpStore.py:1.11 Thu Oct 2 14:17:19 2003
+++ Zope3/src/ZODB/TmpStore.py Wed Mar 31 22:56:58 2004
@@ -22,8 +22,9 @@
_bver = ''
- def __init__(self, base_version):
+ def __init__(self, base_version, storage):
self._transaction = None
+ self._storage = storage
if base_version:
self._bver = base_version
self._file = tempfile.TemporaryFile()
@@ -34,14 +35,13 @@
self._index = {}
# _tindex: map oid to pos for new updates
self._tindex = {}
- self._db = None
self._creating = []
def close(self):
self._file.close()
def getName(self):
- return self._db.getName()
+ return self._storage.getName()
def getSize(self):
return self._pos
@@ -66,14 +66,13 @@
def modifiedInVersion(self, oid):
if self._index.has_key(oid):
return self._bver
- return self._db._storage.modifiedInVersion(oid)
+ return self._storage.modifiedInVersion(oid)
def new_oid(self):
- return self._db._storage.new_oid()
+ return self._storage.new_oid()
def registerDB(self, db, limit):
- self._db = db
- self._storage = db._storage
+ pass
def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction:
=== Zope3/src/ZODB/MappingStorage.py 1.12 => 1.13 ===
--- Zope3/src/ZODB/MappingStorage.py:1.12 Thu Mar 11 15:10:55 2004
+++ Zope3/src/ZODB/MappingStorage.py Wed Mar 31 22:56:58 2004
@@ -68,6 +68,16 @@
finally:
self._lock_release()
+ def getTid(self, oid):
+ self._lock_acquire()
+ try:
+ # The tid is the first 8 bytes of the buffer.
+ s = self._index[oid]
+ return s[:8]
+ finally:
+ self._lock_release()
+
+
def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
=== Zope3/src/ZODB/DB.py 1.71 => 1.72 ===
--- Zope3/src/ZODB/DB.py:1.71 Tue Mar 16 11:28:19 2004
+++ Zope3/src/ZODB/DB.py Wed Mar 31 22:56:58 2004
@@ -23,9 +23,10 @@
from ZODB.broken import find_global
from ZODB.Connection import Connection
from ZODB.serialize import referencesf
-from ZODB.Transaction import Transaction, get_transaction
from zLOG import LOG, ERROR
+import transaction
+
class DB(object):
"""The Object Database
-------------------
@@ -132,7 +133,7 @@
p = cPickle.Pickler(file, 1)
p.dump((root.__class__, None))
p.dump(root.__getstate__())
- t = Transaction()
+ t = transaction.Transaction()
t.description = 'initial database creation'
storage.tpc_begin(t)
storage.store('\0\0\0\0\0\0\0\0', None, file.getvalue(), '', t)
@@ -140,13 +141,12 @@
storage.tpc_finish(t)
# Pass through methods:
- for m in ('history',
- 'supportsUndo', 'supportsVersions', 'undoLog',
- 'versionEmpty', 'versions'):
+ for m in ['history', 'supportsUndo', 'supportsVersions', 'undoLog',
+ 'versionEmpty', 'versions']:
setattr(self, m, getattr(storage, m))
if hasattr(storage, 'undoInfo'):
- self.undoInfo=storage.undoInfo
+ self.undoInfo = storage.undoInfo
def _cacheMean(self, attr):
@@ -206,10 +206,10 @@
self._temps=t
finally: self._r()
- def abortVersion(self, version, transaction=None):
- if transaction is None:
- transaction = get_transaction()
- transaction.register(AbortVersion(self, version))
+ def abortVersion(self, version, txn=None):
+ if txn is None:
+ txn = transaction.get()
+ txn.register(AbortVersion(self, version))
def cacheDetail(self):
"""Return information on objects in the various caches
@@ -316,10 +316,10 @@
"""
self._storage.close()
- def commitVersion(self, source, destination='', transaction=None):
- if transaction is None:
- transaction = get_transaction()
- transaction.register(CommitVersion(self, source, destination))
+ def commitVersion(self, source, destination='', txn=None):
+ if txn is None:
+ txn = transaction.get()
+ txn.register(CommitVersion(self, source, destination))
def getCacheSize(self):
return self._cache_size
@@ -391,7 +391,7 @@
return len(self._storage)
def open(self, version='', transaction=None, temporary=0, force=None,
- waitflag=1, mvcc=True):
+ waitflag=1, mvcc=True, txn_mgr=None):
"""Return a database Connection for use by application code.
The optional version argument can be used to specify that a
@@ -424,7 +424,7 @@
# a one-use connection.
c = self.klass(version=version,
cache_size=self._version_cache_size,
- mvcc=mvcc)
+ mvcc=mvcc, txn_mgr=txn_mgr)
c._setDB(self)
self._temps.append(c)
if transaction is not None:
@@ -474,13 +474,13 @@
if self._version_pool_size > len(allocated) or force:
c = self.klass(version=version,
cache_size=self._version_cache_size,
- mvcc=mvcc)
+ mvcc=mvcc, txn_mgr=txn_mgr)
allocated.append(c)
pool.append(c)
elif self._pool_size > len(allocated) or force:
c = self.klass(version=version,
cache_size=self._cache_size,
- mvcc=mvcc)
+ mvcc=mvcc, txn_mgr=txn_mgr)
allocated.append(c)
pool.append(c)
@@ -611,7 +611,7 @@
def cacheStatistics(self): return () # :(
- def undo(self, id, transaction=None):
+ def undo(self, id, txn=None):
"""Undo a transaction identified by id.
A transaction can be undone if all of the objects involved in
@@ -625,12 +625,12 @@
:Parameters:
- `id`: a storage-specific transaction identifier
- - `transaction`: transaction context to use for undo().
+ - `txn`: transaction context to use for undo().
By default, uses the current transaction.
"""
- if transaction is None:
- transaction = get_transaction()
- transaction.register(TransactionalUndo(self, id))
+ if txn is None:
+ txn = transaction.get()
+ txn.register(TransactionalUndo(self, id))
def versionEmpty(self, version):
return self._storage.versionEmpty(version)
@@ -663,7 +663,6 @@
def __init__(self, db):
self._db = db
# Delegate the actual 2PC methods to the storage
- self.tpc_begin = self._db._storage.tpc_begin
self.tpc_vote = self._db._storage.tpc_vote
self.tpc_finish = self._db._storage.tpc_finish
self.tpc_abort = self._db._storage.tpc_abort
@@ -671,13 +670,19 @@
def sortKey(self):
return "%s:%s" % (self._db._storage.sortKey(), id(self))
+ def tpc_begin(self, txn, sub=False):
+ # XXX we should never be called with sub=True.
+ if sub:
+ raise ValueError, "doesn't supoprt sub-transactions"
+ self._db._storage.tpc_begin(txn)
+
# The object registers itself with the txn manager, so the ob
# argument to the methods below is self.
- def abort(self, ob, t):
+ def abort(self, obj, txn):
pass
- def commit(self, ob, t):
+ def commit(self, obj, txn):
pass
class CommitVersion(ResourceManager):
=== Zope3/src/ZODB/Connection.py 1.140 => 1.141 ===
--- Zope3/src/ZODB/Connection.py:1.140 Tue Mar 16 11:18:20 2004
+++ Zope3/src/ZODB/Connection.py Wed Mar 31 22:56:58 2004
@@ -26,12 +26,13 @@
from persistent import PickleCache
from persistent.interfaces import IPersistent
+import transaction
+
from ZODB.ConflictResolution import ResolvedSerial
from ZODB.ExportImport import ExportImport
from ZODB.POSException \
import ConflictError, ReadConflictError, InvalidObjectReference
from ZODB.TmpStore import TmpStore
-from ZODB.Transaction import Transaction, get_transaction
from ZODB.utils import oid_repr, z64
from ZODB.serialize import ObjectWriter, ConnectionObjectReader, myhasattr
@@ -79,7 +80,7 @@
In many applications, root() is the only method of the Connection
that you will need to use.
-
+
Synchronization
---------------
@@ -104,7 +105,7 @@
XXX Mention the database pool.
:Groups:
-
+
- `User Methods`: root, get, add, close, db, sync, isReadOnly,
cacheGC, cacheFullSweep, cacheMinimize, getVersion,
modifiedInVersion
@@ -123,10 +124,9 @@
_tmp = None
_code_timestamp = 0
- _transaction = None
def __init__(self, version='', cache_size=400,
- cache_deactivate_after=None, mvcc=True):
+ cache_deactivate_after=None, mvcc=True, txn_mgr=None):
"""Create a new Connection.
A Connection instance should by instantiated by the DB
@@ -153,6 +153,13 @@
self._reset_counter = global_reset_counter
self._load_count = 0 # Number of objects unghosted
self._store_count = 0 # Number of objects stored
+ self._modified = []
+
+ # If a transaction manager is passed to the constructor, use
+ # it instead of the global transaction manager. The instance
+ # variable will hold either a TM class or the transaction
+ # module itself, which implements the same interface.
+ self._txn_mgr = txn_mgr or transaction
# _invalidated queues invalidate messages delivered from the DB
# _inv_lock prevents one thread from modifying the set while
@@ -187,21 +194,18 @@
self._import = None
def getTransaction(self):
- t = self._transaction
- if t is None:
- # Fall back to thread-bound transactions
- t = get_transaction()
- return t
+ # XXX mark this as deprecated?
+ return self._txn_mgr.get()
def setLocalTransaction(self):
"""Use a transaction bound to the connection rather than the thread"""
- # XXX mention that this should only be called when you open
- # a connection or at transaction boundaries (but the lattter are
- # hard to be sure about).
- if self._transaction is None:
- self._transaction = Transaction()
- return self._transaction
+ # XXX mark this method as depcrecated? note that it's
+ # signature changed?
+
+ if self._txn_mgr is transaction:
+ self._txn_mgr = transaction.TransactionManager()
+ return self._txn_mgr
def _cache_items(self):
# find all items on the lru list
@@ -248,7 +252,7 @@
if self._storage is None:
# XXX Should this be a ZODB-specific exception?
raise RuntimeError("The database connection is closed")
-
+
obj = self._cache.get(oid, None)
if obj is not None:
return obj
@@ -297,7 +301,7 @@
if self._storage is None:
# XXX Should this be a ZODB-specific exception?
raise RuntimeError("The database connection is closed")
-
+
marker = object()
oid = getattr(obj, "_p_oid", marker)
if oid is marker:
@@ -307,10 +311,13 @@
assert obj._p_oid is None
oid = obj._p_oid = self._storage.new_oid()
obj._p_jar = self
- self._added[oid] = obj
if self._added_during_commit is not None:
self._added_during_commit.append(obj)
- self.getTransaction().register(obj)
+ self._txn_mgr.get().register(obj)
+ # Add to _added after calling register(), so that _added
+ # can be used as a test for whether the object has been
+ # registered with the transaction.
+ self._added[oid] = obj
elif obj._p_jar is not self:
raise InvalidObjectReference(obj, obj._p_jar)
@@ -328,7 +335,7 @@
was closed will be processed.
If resetCaches() was called, the cache will be cleared.
-
+
:Parameters:
- `odb`: that database that owns the Connection
"""
@@ -337,7 +344,7 @@
# other attributes on open and clearing them on close?
# A Connection is only ever associated with a single DB
# and Storage.
-
+
self._db = odb
self._storage = odb._storage
self._sortKey = odb._storage.sortKey
@@ -409,7 +416,7 @@
def cacheGC(self):
"""Reduce cache size to target size.
-
+
Call _p_deactivate() on cached objects until the cache size
falls under the target size.
"""
@@ -453,7 +460,7 @@
except: # except what?
f = getattr(f, 'im_self', f)
self._log.error("Close callback failed for %s", f,
- sys.exc_info())
+ exc_info=sys.exc_info())
self.__onCloseCallbacks = None
self._storage = self._tmp = self.new_oid = None
self._debug_info = ()
@@ -472,7 +479,6 @@
oid = obj._p_oid
if oid in self._conflicts:
- self.getTransaction().register(obj)
raise ReadConflictError(object=obj)
if oid is None or obj._p_jar is not self:
@@ -537,7 +543,7 @@
src = self._storage
self._storage = self._tmp
self._tmp = None
-
+
self._log.debug("Commiting subtransaction of size %s", src.getSize())
oids = src._index.keys()
self._storage.tpc_begin(t)
@@ -603,7 +609,7 @@
:Parameters:
- `tid`: the storage-level id of the transaction that committed
- `oids`: oids is a set of oids, represented as a dict with oids
- as keys.
+ as keys.
"""
self._inv_lock.acquire()
try:
@@ -643,14 +649,17 @@
# There is some old Zope code that assigns _p_jar
# directly. That is no longer allowed, but we need to
# provide support for old code that still does it.
-
+
# XXX The actual complaint here is that an object without
# an oid is being registered. I can't think of any way to
# achieve that without assignment to _p_jar. If there is
# a way, this will be a very confusing warning.
warnings.warn("Assigning to _p_jar is deprecated",
DeprecationWarning)
- self.getTransaction().register(obj)
+ elif obj._p_oid in self._added:
+ # It was registered before it was added to _added.
+ return
+ self._txn_mgr.get().register(obj)
def root(self):
"""Return the database root object.
@@ -729,7 +738,7 @@
"""Load non-current state for obj or raise ReadConflictError."""
if not (self._mvcc and self._setstate_noncurrent(obj)):
- self.getTransaction().register(obj)
+ self._txn_mgr.get().register(obj)
self._conflicts[obj._p_oid] = True
raise ReadConflictError(object=obj)
@@ -771,7 +780,7 @@
finally:
self._inv_lock.release()
else:
- self.getTransaction().register(obj)
+ self._txn_mgr.get().register(obj)
raise ReadConflictError(object=obj)
def oldstate(self, obj, tid):
@@ -782,7 +791,7 @@
to other peristent objects are handled.
:return: a persistent object
-
+
:Parameters:
- `obj`: a persistent object from this Connection.
- `tid`: id of a transaction that wrote an earlier revision.
@@ -829,19 +838,16 @@
del obj._p_oid
del obj._p_jar
- def tpc_begin(self, transaction, sub=None):
+ def tpc_begin(self, transaction, sub=False):
self._modified = []
-
+
# _creating is a list of oids of new objects, which is used to
# remove them from the cache if a transaction aborts.
self._creating = []
- if sub:
+ if sub and self._tmp is None:
# Sub-transaction!
- if self._tmp is None:
- _tmp = TmpStore(self._version)
- self._tmp = self._storage
- self._storage = _tmp
- _tmp.registerDB(self._db, 0)
+ self._tmp = self._storage
+ self._storage = TmpStore(self._version, self._storage)
self._storage.tpc_begin(transaction)
@@ -920,7 +926,7 @@
self._flush_invalidations()
def sync(self):
- self.getTransaction().abort()
+ self._txn_mgr.get().abort()
sync = getattr(self._storage, 'sync', 0)
if sync:
sync()
@@ -949,5 +955,5 @@
new._p_oid = oid
new._p_jar = self
new._p_changed = 1
- self.getTransaction().register(new)
+ self._txn_mgr.get().register(new)
self._cache[oid] = new
More information about the Zope-Checkins
mailing list