[Zodb-checkins] SVN: ZODB/branches/pycon-multidb/src/ - Various
interface cleanups. ZODB.Connection.Connection is now completely
Christian Theune
ct at gocept.com
Sun Mar 20 12:50:11 EST 2005
Log message for revision 29591:
- Various interface cleanups. ZODB.Connection.Connection is now completely
documented in an interface.
- The IDataManager, IDataManagerOrignal and IResourceManager cludge was
cleaned up to be only IDataManager now which reflects the current
situation.
Changed:
U ZODB/branches/pycon-multidb/src/ZODB/BaseStorage.py
U ZODB/branches/pycon-multidb/src/ZODB/Connection.py
U ZODB/branches/pycon-multidb/src/ZODB/DB.py
U ZODB/branches/pycon-multidb/src/ZODB/TmpStore.py
U ZODB/branches/pycon-multidb/src/ZODB/interfaces.py
U ZODB/branches/pycon-multidb/src/persistent/interfaces.py
U ZODB/branches/pycon-multidb/src/transaction/interfaces.py
-=-
Modified: ZODB/branches/pycon-multidb/src/ZODB/BaseStorage.py
===================================================================
--- ZODB/branches/pycon-multidb/src/ZODB/BaseStorage.py 2005-03-20 02:14:40 UTC (rev 29590)
+++ ZODB/branches/pycon-multidb/src/ZODB/BaseStorage.py 2005-03-20 17:50:11 UTC (rev 29591)
@@ -252,6 +252,12 @@
pass
def tpc_finish(self, transaction, f=None):
+ # It's important that the storage calls the function we pass
+ # while it still has its lock. We don't want another thread
+ # to be able to read any updated data until we've had a chance
+ # to send an invalidation message to all of the other
+ # connections!
+
self._lock_acquire()
try:
if transaction is not self._transaction:
Modified: ZODB/branches/pycon-multidb/src/ZODB/Connection.py
===================================================================
--- ZODB/branches/pycon-multidb/src/ZODB/Connection.py 2005-03-20 02:14:40 UTC (rev 29590)
+++ ZODB/branches/pycon-multidb/src/ZODB/Connection.py 2005-03-20 17:50:11 UTC (rev 29591)
@@ -23,6 +23,12 @@
from persistent import PickleCache
+# interfaces
+from persistent.interfaces import IPersistentDataManager
+from ZODB.interfaces import IConnection
+from transaction.interfaces import IDataManager
+from zope.interface import implements
+
import transaction
from ZODB.ConflictResolution import ResolvedSerial
@@ -31,13 +37,10 @@
import ConflictError, ReadConflictError, InvalidObjectReference, \
ConnectionStateError
from ZODB.TmpStore import TmpStore
-from ZODB.utils import u64, oid_repr, z64, positive_id
from ZODB.serialize import ObjectWriter, ConnectionObjectReader, myhasattr
-from ZODB.interfaces import IConnection
-from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36
+from ZODB.utils import u64, oid_repr, z64, positive_id, \
+ DEPRECATED_ARGUMENT, deprecated36
-from zope.interface import implements
-
global_reset_counter = 0
def resetCaches():
@@ -54,127 +57,19 @@
global_reset_counter += 1
class Connection(ExportImport, object):
- """Connection to ZODB for loading and storing objects.
+ """Connection to ZODB for loading and storing objects."""
- The Connection object serves as a data manager. The root() method
- on a Connection returns the root object for the database. This
- object and all objects reachable from it are associated with the
- Connection that loaded them. When a transaction commits, it uses
- the Connection to store modified objects.
+ implements(IConnection, IDataManager, IPersistentDataManager)
- Typical use of ZODB is for each thread to have its own
- Connection and that no thread should have more than one Connection
- to the same database. A thread is associated with a Connection by
- loading objects from that Connection. Objects loaded by one
- thread should not be used by another thread.
-
- A Connection can be associated with a single version when it is
- created. By default, a Connection is not associated with a
- version; it uses non-version data.
-
- Each Connection provides an isolated, consistent view of the
- database, by managing independent copies of objects in the
- database. At transaction boundaries, these copies are updated to
- reflect the current state of the database.
-
- You should not instantiate this class directly; instead call the
- open() method of a DB instance.
-
- In many applications, root() is the only method of the Connection
- that you will need to use.
-
- Synchronization
- ---------------
-
- A Connection instance is not thread-safe. It is designed to
- support a thread model where each thread has its own transaction.
- If an application has more than one thread that uses the
- connection or the transaction the connection is registered with,
- the application should provide locking.
-
- The Connection manages movement of objects in and out of object
- storage.
-
- TODO: We should document an intended API for using a Connection via
- multiple threads.
-
- TODO: We should explain that the Connection has a cache and that
- multiple calls to get() will return a reference to the same
- object, provided that one of the earlier objects is still
- referenced. Object identity is preserved within a connection, but
- not across connections.
-
- TODO: Mention the database pool.
-
- A database connection always presents a consistent view of the
- objects in the database, although it may not always present the
- most current revision of any particular object. Modifications
- made by concurrent transactions are not visible until the next
- transaction boundary (abort or commit).
-
- Two options affect consistency. By default, the mvcc and synch
- options are enabled by default.
-
- If you pass mvcc=True to db.open(), the Connection will never read
- non-current revisions of an object. Instead it will raise a
- ReadConflictError to indicate that the current revision is
- unavailable because it was written after the current transaction
- began.
-
- The logic for handling modifications assumes that the thread that
- opened a Connection (called db.open()) is the thread that will use
- the Connection. If this is not true, you should pass synch=False
- to db.open(). When the synch option is disabled, some transaction
- boundaries will be missed by the Connection; in particular, if a
- transaction does not involve any modifications to objects loaded
- from the Connection and synch is disabled, the Connection will
- miss the transaction boundary. Two examples of this behavior are
- db.undo() and read-only transactions.
-
-
- :Groups:
-
- - `User Methods`: root, get, add, close, db, sync, isReadOnly,
- cacheGC, cacheFullSweep, cacheMinimize, getVersion,
- modifiedInVersion
- - `Experimental Methods`: setLocalTransaction, getTransaction,
- onCloseCallbacks
- - `Transaction Data Manager Methods`: tpc_begin, tpc_vote,
- tpc_finish, tpc_abort, sortKey, abort, commit, commit_sub,
- abort_sub
- - `Database Invalidation Methods`: invalidate, _setDB
- - `IPersistentDataManager Methods`: setstate, register,
- setklassstate
- - `Other Methods`: oldstate, exchange, getDebugInfo, setDebugInfo,
- getTransferCounts
-
- """
- implements(IConnection)
-
_tmp = None
_code_timestamp = 0
+ # ZODB.IConnection
+
def __init__(self, version='', cache_size=400,
cache_deactivate_after=None, mvcc=True, txn_mgr=None,
synch=True):
- """Create a new Connection.
-
- A Connection instance should by instantiated by the DB
- instance that it is connected to.
-
- :Parameters:
- - `version`: the "version" that all changes will be made
- in, defaults to no version.
- - `cache_size`: the target size of the in-memory object
- cache, measured in objects.
- - `cache_deactivate_after`: deprecated, ignored
- - `mvcc`: boolean indicating whether MVCC is enabled
- - `txn_mgr`: transaction manager to use. None means
- used the default transaction manager.
- - `synch`: boolean indicating whether Connection should
- register for afterCompletion() calls.
- """
-
+ """Create a new Connection."""
self._log = logging.getLogger("ZODB.Connection")
self._storage = None
self._debug_info = ()
@@ -265,79 +160,8 @@
connection = new_con
return connection
- def getTransaction(self):
- """Get the current transaction for this connection.
-
- :deprecated:
-
- The transaction manager's get method works the same as this
- method. You can pass a transaction manager (TM) to DB.open()
- to control which TM the Connection uses.
- """
- deprecated36("getTransaction() is deprecated. "
- "Use the txn_mgr argument to DB.open() instead.")
- return self._txn_mgr.get()
-
- def setLocalTransaction(self):
- """Use a transaction bound to the connection rather than the thread.
-
- :deprecated:
-
- Returns the transaction manager used by the connection. You
- can pass a transaction manager (TM) to DB.open() to control
- which TM the Connection uses.
- """
- deprecated36("setLocalTransaction() is deprecated. "
- "Use the txn_mgr argument to DB.open() instead.")
- if self._txn_mgr is transaction.manager:
- if self._synch:
- self._txn_mgr.unregisterSynch(self)
- self._txn_mgr = transaction.TransactionManager()
- if self._synch:
- self._txn_mgr.registerSynch(self)
- return self._txn_mgr
-
- def _cache_items(self):
- # find all items on the lru list
- items = self._cache.lru_items()
- # fine everything. some on the lru list, some not
- everything = self._cache.cache_data
- # remove those items that are on the lru list
- for k,v in items:
- del everything[k]
- # return a list of [ghosts....not recently used.....recently used]
- return everything.items() + items
-
- def __repr__(self):
- if self._version:
- ver = ' (in version %s)' % `self._version`
- else:
- ver = ''
- return '<Connection at %08x%s>' % (positive_id(self), ver)
-
def get(self, oid):
- """Return the persistent object with oid 'oid'.
-
- If the object was not in the cache and the object's class is
- ghostable, then a ghost will be returned. If the object is
- already in the cache, a reference to the cached object will be
- returned.
-
- Applications seldom need to call this method, because objects
- are loaded transparently during attribute lookup.
-
- :return: persistent object corresponding to `oid`
-
- :Parameters:
- - `oid`: an object id
-
- :Exceptions:
- - `KeyError`: if oid does not exist. It is possible that an
- object does not exist as of the current transaction, but
- existed in the past. It may even exist again in the
- future, if the transaction that removed it is undone.
- - `ConnectionStateError`: if the connection is closed.
- """
+ """Return the persistent object with oid 'oid'."""
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
@@ -359,33 +183,8 @@
self._cache[oid] = obj
return obj
- # deprecate this method?
- __getitem__ = get
-
def add(self, obj):
- """Add a new object 'obj' to the database and assign it an oid.
-
- A persistent object is normally added to the database and
- assigned an oid when it becomes reachable to an object already in
- the database. In some cases, it is useful to create a new
- object and use its oid (_p_oid) in a single transaction.
-
- This method assigns a new oid regardless of whether the object
- is reachable.
-
- The object is added when the transaction commits. The object
- must implement the IPersistent interface and must not
- already be associated with a Connection.
-
- :Parameters:
- - `obj`: a Persistent object
-
- :Exceptions:
- - `TypeError`: if obj is not a persistent object.
- - `InvalidObjectReference`: if obj is already associated
- with another connection.
- - `ConnectionStateError`: if the connection is closed.
- """
+ """Add a new object 'obj' to the database and assign it an oid."""
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
@@ -409,75 +208,11 @@
raise InvalidObjectReference(obj, obj._p_jar)
def sortKey(self):
- # If two connections use the same storage, give them a
- # consistent order using id(). This is unique for the
- # lifetime of a connection, which is good enough.
- return "%s:%s" % (self._sortKey(), id(self))
+ """Return a consistent sort key for this connection."""
+ return "%s:%s" % (self._storage.sortKey(), id(self))
- def _setDB(self, odb, mvcc=None, txn_mgr=None, synch=None):
- """Register odb, the DB that this Connection uses.
-
- This method is called by the DB every time a Connection
- is opened. Any invalidations received while the Connection
- was closed will be processed.
-
- If the global module function resetCaches() was called, the
- cache will be cleared.
-
- :Parameters:
- - `odb`: database that owns the Connection
- - `mvcc`: boolean indicating whether MVCC is enabled
- - `txn_mgr`: transaction manager to use. None means
- used the default transaction manager.
- - `synch`: boolean indicating whether Connection should
- register for afterCompletion() calls.
- """
-
- # TODO: Why do we go to all the trouble of setting _db and
- # other attributes on open and clearing them on close?
- # A Connection is only ever associated with a single DB
- # and Storage.
-
- self._db = odb
- self._storage = odb._storage
- self._sortKey = odb._storage.sortKey
- self.new_oid = odb._storage.new_oid
- self._opened = time()
- if synch is not None:
- self._synch = synch
- if mvcc is not None:
- self._mvcc = mvcc
- self._txn_mgr = txn_mgr or transaction.manager
- if self._reset_counter != global_reset_counter:
- # New code is in place. Start a new cache.
- self._resetCache()
- else:
- self._flush_invalidations()
- if self._synch:
- self._txn_mgr.registerSynch(self)
- self._reader = ConnectionObjectReader(self, self._cache,
- self._db.classFactory)
-
- # Multi-database support
- self.connections = {self._db.database_name: self}
-
- def _resetCache(self):
- """Creates a new cache, discarding the old one.
-
- See the docstring for the resetCaches() function.
- """
- self._reset_counter = global_reset_counter
- self._invalidated.clear()
- cache_size = self._cache.cache_size
- self._cache = cache = PickleCache(self, cache_size)
-
def abort(self, transaction):
- """Abort modifications to registered objects.
-
- This tells the cache to invalidate changed objects. _p_jar
- and _p_oid are deleted from new objects.
- """
-
+ """Abort a transaction and forget all changes."""
for obj in self._registered_objects:
oid = obj._p_oid
assert oid is not None
@@ -490,70 +225,22 @@
self._tpc_cleanup()
- # Should there be a way to call incrgc directly?
- # Perhaps "full sweep" should do that?
+ # TODO: we should test what happens when cacheGC is called mid-transaction.
- # TODO: we should test what happens when these methods are called
- # mid-transaction.
-
- def cacheFullSweep(self, dt=None):
- deprecated36("cacheFullSweep is deprecated. "
- "Use cacheMinimize instead.")
- if dt is None:
- self._cache.full_sweep()
- else:
- self._cache.full_sweep(dt)
-
- def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
- """Deactivate all unmodified objects in the cache.
-
- Call _p_deactivate() on each cached object, attempting to turn
- it into a ghost. It is possible for individual objects to
- remain active.
-
- :Parameters:
- - `dt`: ignored. It is provided only for backwards compatibility.
- """
- if dt is not DEPRECATED_ARGUMENT:
- deprecated36("cacheMinimize() dt= is ignored.")
- self._cache.minimize()
-
def cacheGC(self):
- """Reduce cache size to target size.
-
- Call _p_deactivate() on cached objects until the cache size
- falls under the target size.
- """
+ """Reduce cache size to target size."""
self._cache.incrgc()
__onCloseCallbacks = None
def onCloseCallback(self, f):
- """Register a callable, f, to be called by close().
-
- The callable, f, will be called at most once, the next time
- the Connection is closed.
-
- :Parameters:
- - `f`: object that will be called on `close`
- """
+ """Register a callable, f, to be called by close()."""
if self.__onCloseCallbacks is None:
self.__onCloseCallbacks = []
self.__onCloseCallbacks.append(f)
def close(self):
- """Close the Connection.
-
- A closed Connection should not be used by client code. It
- can't load or store objects. Objects in the cache are not
- freed, because Connections are re-used and the cache are
- expected to be useful to the next client.
-
- When the Connection is closed, all callbacks registered by
- onCloseCallback() are invoked and the cache is scanned for
- old objects.
- """
-
+ """Close the Connection."""
if not self._needs_to_join:
# We're currently joined to a transaction.
raise ConnectionStateError("Cannot close a connection joined to "
@@ -590,7 +277,10 @@
# assert that here, because self may have been reused (by
# another thread) by the time we get back here.
+ # transaction.interfaces.IDataManager
+
def commit(self, transaction):
+ """Commit changes to an object"""
if self._import:
# TODO: This code seems important for Zope, but needs docs
# to explain why.
@@ -668,7 +358,8 @@
self._handle_serial(s, oid)
def commit_sub(self, t):
- """Commit all work done in all subtransactions for this transaction."""
+ """Commit all changes made in subtransactions and begin 2-phase commit
+ """
if self._tmp is None:
return
src = self._storage
@@ -689,7 +380,7 @@
self._handle_serial(s, oid, change=False)
def abort_sub(self, t):
- """Abort work done in all subtransactions for this transaction."""
+ """Discard all subtransaction data."""
if self._tmp is None:
return
src = self._storage
@@ -700,7 +391,7 @@
self._invalidate_creating(src._creating)
def _invalidate_creating(self, creating=None):
- """Dissown any objects newly saved in an uncommitted transaction."""
+ """Disown any objects newly saved in an uncommitted transaction."""
if creating is None:
creating = self._creating
self._creating = []
@@ -712,34 +403,42 @@
del o._p_jar
del o._p_oid
+ # The next two methods are callbacks for transaction synchronization.
+
+ def beforeCompletion(self, txn):
+ # We don't do anything before a commit starts.
+ pass
+
+ def afterCompletion(self, txn):
+ self._flush_invalidations()
+
+ def _flush_invalidations(self):
+ self._inv_lock.acquire()
+ try:
+ self._cache.invalidate(self._invalidated)
+ self._invalidated.clear()
+ self._txn_time = None
+ finally:
+ self._inv_lock.release()
+ # Now is a good time to collect some garbage
+ self._cache.incrgc()
+
+ def root(self):
+ """Return the database root object."""
+ return self.get(z64)
+
def db(self):
+ """Returns a handle to the database this connection belongs to."""
return self._db
- def getVersion(self):
- if self._storage is None:
- raise ConnectionStateError("The database connection is closed")
- return self._version
-
def isReadOnly(self):
+ """Returns True if the storage for this connection is read only."""
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
return self._storage.isReadOnly()
def invalidate(self, tid, oids):
- """Notify the Connection that transaction 'tid' invalidated oids.
-
- When the next transaction boundary is reached, objects will be
- invalidated. If any of the invalidated objects is accessed by
- the current transaction, the revision written before C{tid}
- will be used.
-
- The DB calls this method, even when the Connection is closed.
-
- :Parameters:
- - `tid`: the storage-level id of the transaction that committed
- - `oids`: oids is a set of oids, represented as a dict with oids
- as keys.
- """
+ """Notify the Connection that transaction 'tid' invalidated oids."""
self._inv_lock.acquire()
try:
if self._txn_time is None:
@@ -748,72 +447,149 @@
finally:
self._inv_lock.release()
- # The next two methods are callbacks for transaction synchronization.
+ # IDataManager
- def beforeCompletion(self, txn):
- # We don't do anything before a commit starts.
- pass
+ def tpc_begin(self, transaction, sub=False):
+ """Begin commit of a transaction, starting the two-phase commit."""
+ self._modified = []
- def afterCompletion(self, txn):
- self._flush_invalidations()
+ # _creating is a list of oids of new objects, which is used to
+ # remove them from the cache if a transaction aborts.
+ self._creating = []
+ if sub and self._tmp is None:
+ # Sub-transaction!
+ self._tmp = self._storage
+ self._storage = TmpStore(self._version, self._storage)
- def _flush_invalidations(self):
- self._inv_lock.acquire()
- try:
- self._cache.invalidate(self._invalidated)
- self._invalidated.clear()
- self._txn_time = None
- finally:
- self._inv_lock.release()
- # Now is a good time to collect some garbage
- self._cache.incrgc()
+ self._storage.tpc_begin(transaction)
- def modifiedInVersion(self, oid):
+ def tpc_vote(self, transaction):
+ """Verify that a data manager can commit the transaction."""
try:
- return self._db.modifiedInVersion(oid)
- except KeyError:
- return self._version
+ vote = self._storage.tpc_vote
+ except AttributeError:
+ return
+ s = vote(transaction)
+ self._handle_serial(s)
- def register(self, obj):
- """Register obj with the current transaction manager.
+ def _handle_serial(self, store_return, oid=None, change=1):
+ """Handle the returns from store() and tpc_vote() calls."""
- A subclass could override this method to customize the default
- policy of one transaction manager for each thread.
+ # These calls can return different types depending on whether
+ # ZEO is used. ZEO uses asynchronous returns that may be
+ # returned in batches by the ClientStorage. ZEO1 can also
+ # return an exception object and expect that the Connection
+ # will raise the exception.
- obj must be an object loaded from this Connection.
- """
- assert obj._p_jar is self
- if obj._p_oid is None:
- # There is some old Zope code that assigns _p_jar
- # directly. That is no longer allowed, but we need to
- # provide support for old code that still does it.
+ # When commit_sub() exceutes a store, there is no need to
+ # update the _p_changed flag, because the subtransaction
+ # tpc_vote() calls already did this. The change=1 argument
+ # exists to allow commit_sub() to avoid setting the flag
+ # again.
- # The actual complaint here is that an object without
- # an oid is being registered. I can't think of any way to
- # achieve that without assignment to _p_jar. If there is
- # a way, this will be a very confusing warning.
- deprecated36("Assigning to _p_jar is deprecated, and will be "
- "changed to raise an exception.")
- elif obj._p_oid in self._added:
- # It was registered before it was added to _added.
+ # When conflict resolution occurs, the object state held by
+ # the connection does not match what is written to the
+ # database. Invalidate the object here to guarantee that
+ # the new state is read the next time the object is used.
+
+ if not store_return:
return
- self._register(obj)
+ if isinstance(store_return, str):
+ assert oid is not None
+ self._handle_one_serial(oid, store_return, change)
+ else:
+ for oid, serial in store_return:
+ self._handle_one_serial(oid, serial, change)
- def _register(self, obj=None):
- if obj is not None:
- self._registered_objects.append(obj)
- if self._needs_to_join:
- self._txn_mgr.get().join(self)
- self._needs_to_join = False
+ def _handle_one_serial(self, oid, serial, change):
+ if not isinstance(serial, str):
+ raise serial
+ obj = self._cache.get(oid, None)
+ if obj is None:
+ return
+ if serial == ResolvedSerial:
+ del obj._p_changed # transition from changed to ghost
+ else:
+ if change:
+ obj._p_changed = 0 # transition from changed to up-to-date
+ obj._p_serial = serial
- def root(self):
- """Return the database root object.
+ def tpc_finish(self, transaction):
+ """Indicate confirmation that the transaction is done."""
+ if self._tmp is not None:
+ # Commiting a subtransaction!
+ # There is no need to invalidate anything.
+ self._storage.tpc_finish(transaction)
+ self._storage._creating[:0]=self._creating
+ del self._creating[:]
+ else:
+ def callback(tid):
+ d = {}
+ for oid in self._modified:
+ d[oid] = 1
+ self._db.invalidate(tid, d, self)
+ self._storage.tpc_finish(transaction, callback)
+ self._tpc_cleanup()
- The root is a persistent.mapping.PersistentMapping.
- """
- return self.get(z64)
+ def tpc_abort(self, transaction):
+ """Abort a transaction."""
+ if self._import:
+ self._import = None
+ self._storage.tpc_abort(transaction)
+ self._cache.invalidate(self._modified)
+ self._invalidate_creating()
+ while self._added:
+ oid, obj = self._added.popitem()
+ del obj._p_oid
+ del obj._p_jar
+ self._tpc_cleanup()
+ def _tpc_cleanup(self):
+ """Performs cleanup operations to support tpc_finish and tpc_abort."""
+ self._conflicts.clear()
+ if not self._synch:
+ self._flush_invalidations()
+ self._needs_to_join = True
+ self._registered_objects = []
+
+ def sync(self):
+ """Manually update the view on the database."""
+ self._txn_mgr.get().abort()
+ sync = getattr(self._storage, 'sync', 0)
+ if sync:
+ sync()
+ self._flush_invalidations()
+
+ def getDebugInfo(self):
+ """Returns a tuple with different items for debugging the
+ connection.
+ """
+ return self._debug_info
+
+ def setDebugInfo(self, *args):
+ """Add the given items to the debug information of this connection."""
+ self._debug_info = self._debug_info + args
+
+ def getTransferCounts(self, clear=False):
+ """Returns the number of objects loaded and stored."""
+ res = self._load_count, self._store_count
+ if clear:
+ self._load_count = 0
+ self._store_count = 0
+ return res
+
+ ##############################################
+ # persistent.interfaces.IPersistentDatamanager
+
+ def oldstate(self, obj, tid):
+ """Return copy of 'obj' that was written by transaction 'tid'."""
+ assert obj._p_jar is self
+ p = self._storage.loadSerial(obj._p_oid, tid)
+ return self._reader.getState(p)
+
def setstate(self, obj):
+ """Turns the ghost 'obj' into a real object by loading it's from the
+ database."""
oid = obj._p_oid
if self._storage is None:
@@ -882,7 +658,6 @@
def _load_before_or_conflict(self, obj):
"""Load non-current state for obj or raise ReadConflictError."""
-
if not (self._mvcc and self._setstate_noncurrent(obj)):
self._register(obj)
self._conflicts[obj._p_oid] = True
@@ -931,27 +706,138 @@
self._register(obj)
raise ReadConflictError(object=obj)
- def oldstate(self, obj, tid):
- """Return copy of obj that was written by tid.
+ def register(self, obj):
+ """Register obj with the current transaction manager.
- The returned object does not have the typical metadata
- (_p_jar, _p_oid, _p_serial) set. I'm not sure how references
- to other peristent objects are handled.
+ A subclass could override this method to customize the default
+ policy of one transaction manager for each thread.
- :return: a persistent object
+ obj must be an object loaded from this Connection.
+ """
+ assert obj._p_jar is self
+ if obj._p_oid is None:
+ # There is some old Zope code that assigns _p_jar
+ # directly. That is no longer allowed, but we need to
+ # provide support for old code that still does it.
- :Parameters:
- - `obj`: a persistent object from this Connection.
- - `tid`: id of a transaction that wrote an earlier revision.
+ # The actual complaint here is that an object without
+ # an oid is being registered. I can't think of any way to
+ # achieve that without assignment to _p_jar. If there is
+ # a way, this will be a very confusing warning.
+ deprecated36("Assigning to _p_jar is deprecated, and will be "
+ "changed to raise an exception.")
+ elif obj._p_oid in self._added:
+ # It was registered before it was added to _added.
+ return
+ self._register(obj)
- :Exceptions:
- - `KeyError`: if tid does not exist or if tid deleted a revision
- of obj.
+ def _register(self, obj=None):
+ if obj is not None:
+ self._registered_objects.append(obj)
+ if self._needs_to_join:
+ self._txn_mgr.get().join(self)
+ self._needs_to_join = False
+
+ # PROTECTED stuff (used by e.g. ZODB.DB.DB)
+
+ def _cache_items(self):
+ # find all items on the lru list
+ items = self._cache.lru_items()
+ # fine everything. some on the lru list, some not
+ everything = self._cache.cache_data
+ # remove those items that are on the lru list
+ for k,v in items:
+ del everything[k]
+ # return a list of [ghosts....not recently used.....recently used]
+ return everything.items() + items
+
+ def _setDB(self, odb, mvcc=None, txn_mgr=None, synch=None):
+ """Register odb, the DB that this Connection uses.
+
+ This method is called by the DB every time a Connection
+ is opened. Any invalidations received while the Connection
+ was closed will be processed.
+
+ If the global module function resetCaches() was called, the
+ cache will be cleared.
+
+ Parameters:
+ odb: database that owns the Connection
+ mvcc: boolean indicating whether MVCC is enabled
+ txn_mgr: transaction manager to use. None means
+ used the default transaction manager.
+ synch: boolean indicating whether Connection should
+ register for afterCompletion() calls.
"""
- assert obj._p_jar is self
- p = self._storage.loadSerial(obj._p_oid, tid)
- return self._reader.getState(p)
+ # TODO: Why do we go to all the trouble of setting _db and
+ # other attributes on open and clearing them on close?
+ # A Connection is only ever associated with a single DB
+ # and Storage.
+
+ self._db = odb
+ self._storage = odb._storage
+ self.new_oid = odb._storage.new_oid
+ self._opened = time()
+ if synch is not None:
+ self._synch = synch
+ if mvcc is not None:
+ self._mvcc = mvcc
+ self._txn_mgr = txn_mgr or transaction.manager
+ if self._reset_counter != global_reset_counter:
+ # New code is in place. Start a new cache.
+ self._resetCache()
+ else:
+ self._flush_invalidations()
+ if self._synch:
+ self._txn_mgr.registerSynch(self)
+ self._reader = ConnectionObjectReader(self, self._cache,
+ self._db.classFactory)
+
+ # Multi-database support
+ self.connections = {self._db.database_name: self}
+
+ def _resetCache(self):
+ """Creates a new cache, discarding the old one.
+
+ See the docstring for the resetCaches() function.
+ """
+ self._reset_counter = global_reset_counter
+ self._invalidated.clear()
+ cache_size = self._cache.cache_size
+ self._cache = cache = PickleCache(self, cache_size)
+
+ # Python protocol
+
+ def __repr__(self):
+ if self._version:
+ ver = ' (in version %s)' % `self._version`
+ else:
+ ver = ''
+ return '<Connection at %08x%s>' % (positive_id(self), ver)
+
+ # DEPRECATION candidates
+
+ __getitem__ = get
+
+ def modifiedInVersion(self, oid):
+ """Returns the version the object with the given oid was modified in.
+
+ If it wasn't modified in a version, the current version of this
+ connection is returned.
+ """
+ try:
+ return self._db.modifiedInVersion(oid)
+ except KeyError:
+ import pdb; pdb.set_trace()
+ return self.getVersion()
+
+ def getVersion(self):
+ """Returns the version this connection is attached to."""
+ if self._storage is None:
+ raise ConnectionStateError("The database connection is closed")
+ return self._version
+
def setklassstate(self, obj):
# Special case code to handle ZClasses, I think.
# Called the cache when an object of type type is invalidated.
@@ -973,141 +859,60 @@
self._log.error("setklassstate failed", exc_info=sys.exc_info())
raise
- def tpc_begin(self, transaction, sub=False):
- self._modified = []
+ def exchange(self, old, new):
+ # called by a ZClasses method that isn't executed by the test suite
+ oid = old._p_oid
+ new._p_oid = oid
+ new._p_jar = self
+ new._p_changed = 1
+ self._register(new)
+ self._cache[oid] = new
- # _creating is a list of oids of new objects, which is used to
- # remove them from the cache if a transaction aborts.
- self._creating = []
- if sub and self._tmp is None:
- # Sub-transaction!
- self._tmp = self._storage
- self._storage = TmpStore(self._version, self._storage)
+ # DEPRECATED methods
- self._storage.tpc_begin(transaction)
+ def getTransaction(self):
+ """Get the current transaction for this connection.
- def tpc_vote(self, transaction):
- try:
- vote = self._storage.tpc_vote
- except AttributeError:
- return
- s = vote(transaction)
- self._handle_serial(s)
+ :deprecated:
- def _handle_serial(self, store_return, oid=None, change=1):
- """Handle the returns from store() and tpc_vote() calls."""
+ The transaction manager's get method works the same as this
+ method. You can pass a transaction manager (TM) to DB.open()
+ to control which TM the Connection uses.
+ """
+ deprecated36("getTransaction() is deprecated. "
+ "Use the txn_mgr argument to DB.open() instead.")
+ return self._txn_mgr.get()
- # These calls can return different types depending on whether
- # ZEO is used. ZEO uses asynchronous returns that may be
- # returned in batches by the ClientStorage. ZEO1 can also
- # return an exception object and expect that the Connection
- # will raise the exception.
+ def setLocalTransaction(self):
+ """Use a transaction bound to the connection rather than the thread.
- # When commit_sub() exceutes a store, there is no need to
- # update the _p_changed flag, because the subtransaction
- # tpc_vote() calls already did this. The change=1 argument
- # exists to allow commit_sub() to avoid setting the flag
- # again.
+ :deprecated:
- # When conflict resolution occurs, the object state held by
- # the connection does not match what is written to the
- # database. Invalidate the object here to guarantee that
- # the new state is read the next time the object is used.
+ Returns the transaction manager used by the connection. You
+ can pass a transaction manager (TM) to DB.open() to control
+ which TM the Connection uses.
+ """
+ deprecated36("setLocalTransaction() is deprecated. "
+ "Use the txn_mgr argument to DB.open() instead.")
+ if self._txn_mgr is transaction.manager:
+ if self._synch:
+ self._txn_mgr.unregisterSynch(self)
+ self._txn_mgr = transaction.TransactionManager()
+ if self._synch:
+ self._txn_mgr.registerSynch(self)
+ return self._txn_mgr
- if not store_return:
- return
- if isinstance(store_return, str):
- assert oid is not None
- self._handle_one_serial(oid, store_return, change)
+ def cacheFullSweep(self, dt=None):
+ deprecated36("cacheFullSweep is deprecated. "
+ "Use cacheMinimize instead.")
+ if dt is None:
+ self._cache.full_sweep()
else:
- for oid, serial in store_return:
- self._handle_one_serial(oid, serial, change)
+ self._cache.full_sweep(dt)
- def _handle_one_serial(self, oid, serial, change):
- if not isinstance(serial, str):
- raise serial
- obj = self._cache.get(oid, None)
- if obj is None:
- return
- if serial == ResolvedSerial:
- del obj._p_changed # transition from changed to ghost
- else:
- if change:
- obj._p_changed = 0 # transition from changed to up-to-date
- obj._p_serial = serial
+ def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
+ """Deactivate all unmodified objects in the cache."""
+ if dt is not DEPRECATED_ARGUMENT:
+ deprecated36("cacheMinimize() dt= is ignored.")
+ self._cache.minimize()
- def tpc_finish(self, transaction):
- # It's important that the storage calls the function we pass
- # while it still has its lock. We don't want another thread
- # to be able to read any updated data until we've had a chance
- # to send an invalidation message to all of the other
- # connections!
-
- if self._tmp is not None:
- # Commiting a subtransaction!
- # There is no need to invalidate anything.
- self._storage.tpc_finish(transaction)
- self._storage._creating[:0]=self._creating
- del self._creating[:]
- else:
- def callback(tid):
- d = {}
- for oid in self._modified:
- d[oid] = 1
- self._db.invalidate(tid, d, self)
- self._storage.tpc_finish(transaction, callback)
- self._tpc_cleanup()
-
- def tpc_abort(self, transaction):
- if self._import:
- self._import = None
- self._storage.tpc_abort(transaction)
- self._cache.invalidate(self._modified)
- self._invalidate_creating()
- while self._added:
- oid, obj = self._added.popitem()
- del obj._p_oid
- del obj._p_jar
- self._tpc_cleanup()
-
- # Common cleanup actions after tpc_finish/tpc_abort.
- def _tpc_cleanup(self):
- self._conflicts.clear()
- if not self._synch:
- self._flush_invalidations()
- self._needs_to_join = True
- self._registered_objects = []
-
-
- def sync(self):
- self._txn_mgr.get().abort()
- sync = getattr(self._storage, 'sync', 0)
- if sync:
- sync()
- self._flush_invalidations()
-
- def getDebugInfo(self):
- return self._debug_info
-
- def setDebugInfo(self, *args):
- self._debug_info = self._debug_info + args
-
- def getTransferCounts(self, clear=False):
- """Returns the number of objects loaded and stored.
-
- If clear is True, reset the counters.
- """
- res = self._load_count, self._store_count
- if clear:
- self._load_count = 0
- self._store_count = 0
- return res
-
- def exchange(self, old, new):
- # called by a ZClasses method that isn't executed by the test suite
- oid = old._p_oid
- new._p_oid = oid
- new._p_jar = self
- new._p_changed = 1
- self._register(new)
- self._cache[oid] = new
Modified: ZODB/branches/pycon-multidb/src/ZODB/DB.py
===================================================================
--- ZODB/branches/pycon-multidb/src/ZODB/DB.py 2005-03-20 02:14:40 UTC (rev 29590)
+++ ZODB/branches/pycon-multidb/src/ZODB/DB.py 2005-03-20 17:50:11 UTC (rev 29591)
@@ -581,7 +581,7 @@
def get_info(c):
# `result`, `time` and `version` are lexically inherited.
o = c._opened
- d = c._debug_info
+ d = c.getDebugInfo()
if d:
if len(d) == 1:
d = d[0]
Modified: ZODB/branches/pycon-multidb/src/ZODB/TmpStore.py
===================================================================
--- ZODB/branches/pycon-multidb/src/ZODB/TmpStore.py 2005-03-20 02:14:40 UTC (rev 29590)
+++ ZODB/branches/pycon-multidb/src/ZODB/TmpStore.py 2005-03-20 17:50:11 UTC (rev 29591)
@@ -61,6 +61,9 @@
serial = h[:8]
return self._file.read(size), serial
+ def sortKey(self):
+ return self._storage.sortKey()
+
# TODO: clarify difference between self._storage & self._db._storage
def modifiedInVersion(self, oid):
Modified: ZODB/branches/pycon-multidb/src/ZODB/interfaces.py
===================================================================
--- ZODB/branches/pycon-multidb/src/ZODB/interfaces.py 2005-03-20 02:14:40 UTC (rev 29590)
+++ ZODB/branches/pycon-multidb/src/ZODB/interfaces.py 2005-03-20 17:50:11 UTC (rev 29591)
@@ -19,12 +19,119 @@
from zope.interface import Interface, Attribute
class IConnection(Interface):
- """ZODB connection.
+ """Connection to ZODB for loading and storing objects.
- TODO: This interface is incomplete.
+ The Connection object serves as a data manager. The root() method
+ on a Connection returns the root object for the database. This
+ object and all objects reachable from it are associated with the
+ Connection that loaded them. When a transaction commits, it uses
+ the Connection to store modified objects.
+
+ Typical use of ZODB is for each thread to have its own
+ Connection and that no thread should have more than one Connection
+ to the same database. A thread is associated with a Connection by
+ loading objects from that Connection. Objects loaded by one
+ thread should not be used by another thread.
+
+ A Connection can be associated with a single version when it is
+ created. By default, a Connection is not associated with a
+ version; it uses non-version data.
+
+ Each Connection provides an isolated, consistent view of the
+ database, by managing independent copies of objects in the
+ database. At transaction boundaries, these copies are updated to
+ reflect the current state of the database.
+
+ You should not instantiate this class directly; instead call the
+ open() method of a DB instance.
+
+ In many applications, root() is the only method of the Connection
+ that you will need to use.
+
+ Synchronization
+ ---------------
+
+ A Connection instance is not thread-safe. It is designed to
+ support a thread model where each thread has its own transaction.
+ If an application has more than one thread that uses the
+ connection or the transaction the connection is registered with,
+ the application should provide locking.
+
+ The Connection manages movement of objects in and out of object
+ storage.
+
+ TODO: We should document an intended API for using a Connection via
+ multiple threads.
+
+ TODO: We should explain that the Connection has a cache and that
+ multiple calls to get() will return a reference to the same
+ object, provided that one of the earlier objects is still
+ referenced. Object identity is preserved within a connection, but
+ not across connections.
+
+ TODO: Mention the database pool.
+
+ A database connection always presents a consistent view of the
+ objects in the database, although it may not always present the
+ most current revision of any particular object. Modifications
+ made by concurrent transactions are not visible until the next
+ transaction boundary (abort or commit).
+
+ Two options affect consistency. By default, the mvcc and synch
+ options are enabled by default.
+
+ If you pass mvcc=True to db.open(), the Connection will never read
+ non-current revisions of an object. Instead it will raise a
+ ReadConflictError to indicate that the current revision is
+ unavailable because it was written after the current transaction
+ began.
+
+ The logic for handling modifications assumes that the thread that
+ opened a Connection (called db.open()) is the thread that will use
+ the Connection. If this is not true, you should pass synch=False
+ to db.open(). When the synch option is disabled, some transaction
+ boundaries will be missed by the Connection; in particular, if a
+ transaction does not involve any modifications to objects loaded
+ from the Connection and synch is disabled, the Connection will
+ miss the transaction boundary. Two examples of this behavior are
+ db.undo() and read-only transactions.
+
+ Groups of methods:
+
+ User Methods:
+ root, get, add, close, db, sync, isReadOnly, cacheGC, cacheFullSweep,
+ cacheMinimize, getVersion, modifiedInVersion
+
+ Experimental Methods:
+ onCloseCallbacks
+
+ Database Invalidation Methods:
+ invalidate
+
+ Other Methods: exchange, getDebugInfo, setDebugInfo,
+ getTransferCounts
"""
+ def __init__(version='', cache_size=400,
+ cache_deactivate_after=None, mvcc=True, txn_mgr=None,
+ synch=True):
+ """Create a new Connection.
+ A Connection instance should by instantiated by the DB
+ instance that it is connected to.
+
+ Parameters:
+ version: the "version" that all changes will be made in, defaults
+ to no version.
+ cache_size: the target size of the in-memory object cache, measured
+ in objects.
+ mvcc: boolean indicating whether MVCC is enabled
+ txn_mgr: transaction manager to use. None means used the default
+ transaction manager.
+ synch: boolean indicating whether Connection should register for
+ afterCompletion() calls.
+ """
+
def add(ob):
"""Add a new object 'obj' to the database and assign it an oid.
@@ -39,8 +146,108 @@
The object is added when the transaction commits. The object
must implement the IPersistent interface and must not
already be associated with a Connection.
+
+ Parameters:
+ obj: a Persistent object
+
+ Raises TypeError if obj is not a persistent object.
+
+ Raises InvalidObjectReference if obj is already associated with another
+ connection.
+
+ Raises ConnectionStateError if the connection is closed.
"""
+ def get(oid):
+ """Return the persistent object with oid 'oid'.
+
+ If the object was not in the cache and the object's class is
+ ghostable, then a ghost will be returned. If the object is
+ already in the cache, a reference to the cached object will be
+ returned.
+
+ Applications seldom need to call this method, because objects
+ are loaded transparently during attribute lookup.
+
+ Parameters:
+ oid: an object id
+
+ Raises KeyError if oid does not exist.
+
+ It is possible that an
+ object does not exist as of the current transaction, but
+ existed in the past. It may even exist again in the
+ future, if the transaction that removed it is undone.
+
+ Raises ConnectionStateError if the connection is closed.
+ """
+
+ def cacheMinimize():
+ """Deactivate all unmodified objects in the cache.
+
+ Call _p_deactivate() on each cached object, attempting to turn
+ it into a ghost. It is possible for individual objects to
+ remain active.
+ """
+
+ def cacheGC():
+ """Reduce cache size to target size.
+
+ Call _p_deactivate() on cached objects until the cache size
+ falls under the target size.
+ """
+
+ def onCloseCallback(f):
+ """Register a callable, f, to be called by close().
+
+ f will be called with no arguments before the Connection is closed.
+
+ Parameters:
+ f: method that will be called on `close`
+ """
+
+ def close():
+ """Close the Connection.
+
+ When the Connection is closed, all callbacks registered by
+ onCloseCallback() are invoked and the cache is garbage collected.
+
+ A closed Connection should not be used by client code. It can't load
+ or store objects. Objects in the cache are not freed, because
+ Connections are re-used and the cache is expected to be useful to the
+ next client.
+ """
+
+ def db():
+ """Returns a handle to the database this connection belongs to."""
+
+ def isReadOnly():
+ """Returns True if the storage for this connection is read only."""
+
+ def invalidate(tid, oids):
+ """Notify the Connection that transaction 'tid' invalidated oids.
+
+ When the next transaction boundary is reached, objects will be
+ invalidated. If any of the invalidated objects are accessed by the
+ current transaction, the revision written before Connection.tid will be
+ used.
+
+ The DB calls this method, even when the Connection is closed.
+
+ Parameters:
+ tid: the storage-level id of the transaction that committed
+ oids: oids is a set of oids, represented as a dict with oids as keys.
+ """
+
+ def root():
+ """Return the database root object.
+
+ The root is a persistent.mapping.PersistentMapping.
+ """
+
+ def getVersion():
+ """Returns the version this connection is attached to."""
+
# Multi-database support.
connections = Attribute("""\
@@ -114,3 +321,32 @@
entry.
""")
+ def sync(self):
+ """Manually update the view on the database.
+
+ This includes aborting the current transaction, getting a fresh and
+ consistent view of the data (synchronizing with the storage if possible)
+ and call cacheGC() for this connection.
+
+ This method was especially useful in ZODB 3.2 to better support
+ read-only connections that were affected by a couple of problems.
+ """
+
+ # Debug information
+
+ def getDebugInfo():
+ """Returns a tuple with different items for debugging the connection.
+
+ Debug information can be added to a connection by using setDebugInfo.
+ """
+
+ def setDebugInfo(*items):
+ """Add the given items to the debug information of this connection."""
+
+ def getTransferCounts(clear=False):
+ """Returns the number of objects loaded and stored.
+
+ If clear is True, reset the counters.
+ """
+
+
Modified: ZODB/branches/pycon-multidb/src/persistent/interfaces.py
===================================================================
--- ZODB/branches/pycon-multidb/src/persistent/interfaces.py 2005-03-20 02:14:40 UTC (rev 29590)
+++ ZODB/branches/pycon-multidb/src/persistent/interfaces.py 2005-03-20 17:50:11 UTC (rev 29591)
@@ -257,18 +257,35 @@
def setstate(object):
"""Load the state for the given object.
- The object should be in the ghost state.
- The object's state will be set and the object will end up
- in the saved state.
+ The object should be in the ghost state. The object's state will be
+ set and the object will end up in the saved state.
The object must provide the IPersistent interface.
"""
+ def oldstate(obj, tid):
+ """Return copy of 'obj' that was written by transaction 'tid'.
+
+ The returned object does not have the typical metadata (_p_jar, _p_oid,
+ _p_serial) set. I'm not sure how references to other peristent objects
+ are handled.
+
+ Parameters
+ obj: a persistent object from this Connection.
+ tid: id of a transaction that wrote an earlier revision.
+
+ Raises KeyError if tid does not exist or if tid deleted a revision of
+ obj.
+ """
+
def register(object):
"""Register an IPersistent with the current transaction.
This method must be called when the object transitions to
the changed state.
+
+ A subclass could override this method to customize the default
+ policy of one transaction manager for each thread.
"""
def mtime(object):
Modified: ZODB/branches/pycon-multidb/src/transaction/interfaces.py
===================================================================
--- ZODB/branches/pycon-multidb/src/transaction/interfaces.py 2005-03-20 02:14:40 UTC (rev 29590)
+++ ZODB/branches/pycon-multidb/src/transaction/interfaces.py 2005-03-20 17:50:11 UTC (rev 29591)
@@ -18,104 +18,7 @@
import zope.interface
-class IResourceManager(zope.interface.Interface):
- """Objects that manage resources transactionally.
-
- These objects may manage data for other objects, or they may manage
- non-object storages, such as relational databases.
-
- IDataManagerOriginal is the interface currently provided by ZODB
- database connections, but the intent is to move to the newer
- IDataManager.
- """
-
- # Two-phase commit protocol. These methods are called by the
- # ITransaction object associated with the transaction being
- # committed.
-
- def tpc_begin(transaction):
- """Begin two-phase commit, to save data changes.
-
- An implementation should do as much work as possible without
- making changes permanent. Changes should be made permanent
- when tpc_finish is called (or aborted if tpc_abort is called).
- The work can be divided between tpc_begin() and tpc_vote(), and
- the intent is that tpc_vote() be as fast as possible (to minimize
- the period of uncertainty).
-
- transaction is the ITransaction instance associated with the
- transaction being committed.
- """
-
- def tpc_vote(transaction):
- """Verify that a resource manager can commit the transaction.
-
- This is the last chance for a resource manager to vote 'no'. A
- resource manager votes 'no' by raising an exception.
-
- transaction is the ITransaction instance associated with the
- transaction being committed.
- """
-
- def tpc_finish(transaction):
- """Indicate confirmation that the transaction is done.
-
- transaction is the ITransaction instance associated with the
- transaction being committed.
-
- This should never fail. If this raises an exception, the
- database is not expected to maintain consistency; it's a
- serious error.
- """
-
- def tpc_abort(transaction):
- """Abort a transaction.
-
- transaction is the ITransaction instance associated with the
- transaction being committed.
-
- All changes made by the current transaction are aborted. Note
- that this includes all changes stored in any savepoints that may
- be associated with the current transaction.
-
- tpc_abort() can be called at any time, either in or out of the
- two-phase commit.
-
- This should never fail.
- """
-
- # The savepoint/rollback API.
-
- def savepoint(transaction):
- """Save partial transaction changes.
-
- There are two purposes:
-
- 1) To allow discarding partial changes without discarding all
- dhanges.
-
- 2) To checkpoint changes to disk that would otherwise live in
- memory for the duration of the transaction.
-
- Returns an object implementing ISavePoint2 that can be used
- to discard changes made since the savepoint was captured.
-
- An implementation that doesn't support savepoints should implement
- this method by returning a savepoint object that raises an
- exception when its rollback method is called. The savepoint method
- shouldn't raise an error. This way, transactions that create
- savepoints can proceed as long as an attempt is never made to roll
- back a savepoint.
- """
-
- def discard(transaction):
- """Discard changes within the transaction since the last savepoint.
-
- That means changes made since the last savepoint if one exists, or
- since the start of the transaction.
- """
-
-class IDataManagerOriginal(zope.interface.Interface):
+class IDataManager(zope.interface.Interface):
"""Objects that manage transactional storage.
These objects may manage data for other objects, or they may manage
@@ -155,7 +58,7 @@
has been called; this is only used when the transaction is
being committed.
- This call also implied the beginning of 2-phase commit.
+ This call also implies the beginning of 2-phase commit.
"""
# Two-phase commit protocol. These methods are called by the
@@ -180,10 +83,12 @@
"""
-
def tpc_abort(transaction):
"""Abort a transaction.
+ This is called by a transaction manager to end a two-phase commit on
+ the data manager.
+
This is always called after a tpc_begin call.
transaction is the ITransaction instance associated with the
@@ -202,6 +107,11 @@
database is not expected to maintain consistency; it's a
serious error.
+ It's important that the storage calls the passed function
+ while it still has its lock. We don't want another thread
+ to be able to read any updated data until we've had a chance
+ to send an invalidation message to all of the other
+ connections!
"""
def tpc_vote(transaction):
@@ -214,126 +124,47 @@
transaction being committed.
"""
- def commit(object, transaction):
- """CCCommit changes to an object
+ def commit(transaction):
+ """Commit modifications to registered objects.
Save the object as part of the data to be made persistent if
the transaction commits.
- """
- def abort(object, transaction):
- """Abort changes to an object
-
- Only changes made since the last transaction or
- sub-transaction boundary are discarded.
-
- This method may be called either:
-
- o Outside of two-phase commit, or
-
- o In the first phase of two-phase commit
-
+ This includes conflict detection and handling. If no conflicts or
+ errors occur it saves the objects in the storage.
"""
- def sortKey():
- """
- Return a key to use for ordering registered DataManagers
-
- ZODB uses a global sort order to prevent deadlock when it commits
- transactions involving multiple resource managers. The resource
- manager must define a sortKey() method that provides a global ordering
- for resource managers.
- """
-
-class IDataManager(zope.interface.Interface):
- """Data management interface for storing objects transactionally.
-
- ZODB database connections currently provides the older
- IDataManagerOriginal interface, but the intent is to move to this newer
- IDataManager interface.
-
- Our hope is that this interface will evolve and become the standard
- interface. There are some issues to be resolved first, like:
-
- - Probably want separate abort methods for use in and out of
- two-phase commit.
-
- - The savepoint api may need some more thought.
-
- """
-
- def prepare(transaction):
- """Perform the first phase of a 2-phase commit
-
- The data manager prepares for commit any changes to be made
- persistent. A normal return from this method indicated that
- the data manager is ready to commit the changes.
-
- The data manager must raise an exception if it is not prepared
- to commit the transaction after executing prepare().
-
- The transaction must match that used for preceeding
- savepoints, if any.
- """
-
- # This is equivalent to zodb3's tpc_begin, commit, and
- # tpc_vote combined.
-
def abort(transaction):
- """Abort changes made by transaction
+ """Abort a transaction and forget all changes.
- This may be called before two-phase commit or in the second
- phase of two-phase commit.
+ Abort must be called outside of a two-phase commit.
- The transaction must match that used for preceeding
- savepoints, if any.
-
+ Abort is called by the transaction manager to abort transactions
+ that are not yet in a two-phase commit.
"""
- # This is equivalent to *both* zodb3's abort and tpc_abort
- # calls. This should probably be split into 2 methods.
-
- def commit(transaction):
- """Finish two-phase commit
-
- The prepare method must be called, with the same transaction,
- before calling commit.
-
- """
-
- # This is equivalent to zodb3's tpc_finish
-
- def savepoint(transaction):
- """Do tentative commit of changes to this point.
-
- Should return an object implementing IRollback that can be used
- to rollback to the savepoint.
-
- Note that (unlike zodb3) this doesn't use a 2-phase commit
- protocol. If this call fails, or if a rollback call on the
- result fails, the (containing) transaction should be
- aborted. Aborting the containing transaction is *not* the
- responsibility of the data manager, however.
-
- An implementation that doesn't support savepoints should
- implement this method by returning a rollback implementation
- that always raises an error when it's rollback method is
- called. The savepoing method shouldn't raise an error. This
- way, transactions that create savepoints can proceed as long
- as an attempt is never made to roll back a savepoint.
-
- """
-
def sortKey():
- """
- Return a key to use for ordering registered DataManagers
+ """Return a key to use for ordering registered DataManagers
ZODB uses a global sort order to prevent deadlock when it commits
transactions involving multiple resource managers. The resource
manager must define a sortKey() method that provides a global ordering
for resource managers.
"""
+ # XXX: Alternate version:
+ #"""Return a consistent sort key for this connection.
+ #
+ #This allows ordering multiple connections that use the same storage in
+ #a consistent manner. This is unique for the lifetime of a connection,
+ #which is good enough to avoid ZEO deadlocks.
+ #"""
+ def beforeCompletion(transaction):
+ """Hook that is called by the transaction before completing a commit"""
+
+ def afterCompletion(transaction):
+ """Hook that is called by the transaction after completing a commit"""
+
class ITransaction(zope.interface.Interface):
"""Object representing a running transaction.
@@ -414,35 +245,7 @@
# Unsure: is this allowed to cause an exception here, during
# the two-phase commit, or can it toss data silently?
-class ISavePoint(zope.interface.Interface):
- """ISavePoint objects represent partial transaction changes.
- Sequences of savepoint objects are associated with transactions,
- and with IResourceManagers.
- """
-
- def rollback():
- """Discard changes made after this savepoint.
-
- This includes discarding (call the discard method on) all
- subsequent savepoints.
- """
-
- def discard():
- """Discard changes saved by this savepoint.
-
- That means changes made since the immediately preceding
- savepoint if one exists, or since the start of the transaction,
- until this savepoint.
-
- Once a savepoint has been discarded, it's an error to attempt
- to rollback or discard it again.
- """
-
- next_savepoint = zope.interface.Attribute(
- """The next savepoint (later in time), or None if self is the
- most recent savepoint.""")
-
class IRollback(zope.interface.Interface):
def rollback():
@@ -457,3 +260,4 @@
- The transaction has ended.
"""
+
More information about the Zodb-checkins
mailing list