[Zope3-checkins] CVS: Zope3/src/zodb - connection.py:1.6 db.py:1.5 interfaces.py:1.6
Jeremy Hylton
jeremy@zope.com
Tue, 28 Jan 2003 13:17:32 -0500
Update of /cvs-repository/Zope3/src/zodb
In directory cvs.zope.org:/tmp/cvs-serv3644/zodb
Modified Files:
connection.py db.py interfaces.py
Log Message:
Refactor Connection.
Delete unused methods, although some of these may need to come back if
Zope3 grows a service that wants to use them. Removed: oldstate(),
onCloseCallbacks(), applyCloseCallbacks(), cacheFullSweep(),
cacheMinimze().
Organize other methods into groups according to the interface they
implement. Rename methods that don't implement some interface so that
they start with an underscore.
Add sync() to IAppConnection.
Add __implements__ clauses about transaction.interfaces.
=== Zope3/src/zodb/connection.py 1.5 => 1.6 ===
--- Zope3/src/zodb/connection.py:1.5 Tue Jan 28 11:43:54 2003
+++ Zope3/src/zodb/connection.py Tue Jan 28 13:17:30 2003
@@ -60,6 +60,7 @@
from zodb.utils import p64, u64, Set, z64
from transaction import get_transaction
+import transaction.interfaces
from persistence.cache import Cache
from persistence.interfaces import IPersistentDataManager
@@ -74,7 +75,8 @@
storage.
"""
- __implements__ = IAppConnection, IConnection, IPersistentDataManager
+ __implements__ = (IAppConnection, IConnection, IPersistentDataManager,
+ transaction.interfaces.IDataManager)
def __init__(self, db, version='', cache_size=400):
self._db = db
@@ -104,9 +106,20 @@
# new_oid is used by serialize
self.new_oid = self._storage.new_oid
+ ######################################################################
+ # IAppConnection defines the next two methods
+ # root() and sync()
+
def root(self):
return self.get(z64)
+ def sync(self):
+ get_transaction().abort()
+ sync = getattr(self._storage, 'sync', None)
+ if sync is not None:
+ sync()
+ self._flush_invalidations()
+
def modifiedInVersion(self, oid):
try:
return self._storage.modifiedInVersion(oid)
@@ -242,39 +255,85 @@
def close(self):
self._cache.incrgc()
- self.applyCloseCallbacks()
self._opened = None
# Return the connection to the pool.
self._db._closeConnection(self)
- # XXX not sure what the callbacks are for, but they're used by Mount
+ ######################################################################
+ # transaction.interfaces.IDataManager requires the next four methods
+ # prepare(), abort(), commit(), savepoint()
- __onCloseCallbacks = None
+ def prepare(self, txn):
+ self._modified.clear()
+ self._created.clear()
+ if self._tmp is not None:
+ # commit_sub() will call tpc_begin() on the real storage
+ self._commit_sub(txn)
+ else:
+ self._storage.tpc_begin(txn)
- def onCloseCallback(self, f):
- if self.__onCloseCallbacks is None:
- self.__onCloseCallbacks = []
- self.__onCloseCallbacks.append(f)
-
- def applyCloseCallbacks(self):
- # Call the close callbacks.
- if self.__onCloseCallbacks is not None:
- for f in self.__onCloseCallbacks:
- try:
- f()
- except:
- f_self = getattr(f, 'im_self', f)
- self._logger.exception("Close callback failed for %s",
- f_self)
- self.__onCloseCallbacks = None
+ for obj in self._txns.get(txn, ()):
+ self._objcommit(obj, txn)
- # some cache-related methods
+ s = self._storage.tpc_vote(txn)
+ self._handle_serial(s)
+ return True
- def cacheFullSweep(self, dt=0):
- self._cache.full_sweep(dt)
+ def abort(self, txn):
+ # XXX need test to make sure it is safe to call abort()
+ # without calling prepare()
+ if self._tmp is not None:
+ self._abort_sub()
+ self._storage.tpc_abort(txn)
- def cacheMinimize(self, dt=0):
- self._cache.minimize(dt)
+ objs = self._txns.get(txn)
+ if objs is not None:
+ self._cache.invalidateMany([obj._p_oid for obj in objs])
+ del self._txns[txn]
+ self._flush_invalidations()
+ self._cache.invalidateMany(self._modified)
+ self._invalidate_created(self._created)
+ self._created = Set()
+ self._modified.clear()
+
+ def commit(self, txn):
+ # It's important that the storage call the function we pass
+ # (self._invalidate_modified) while it still has its
+ # lock. We don't want another thread to be able to read any
+ # updated data until we've had a chance to send an
+ # invalidation message to all of the other connections!
+
+ self._db.begin_invalidation()
+ # XXX We should really have a try/finally because the begin
+ # call acquired a lock that will only be released in
+ # _invalidate_modified().
+ self._storage.tpc_finish(txn, self._invalidate_modified)
+ try:
+ del self._txns[txn]
+ except KeyError:
+ pass
+
+ self._flush_invalidations()
+
+ def savepoint(self, txn):
+ if self._tmp is None:
+ tmp = TmpStore(self._version)
+ self._tmp = self._storage
+ self._storage = tmp
+ tmp.registerDB(self._db)
+ self._modified = Set()
+ self._created = Set()
+ self._storage.tpc_begin(txn)
+
+ for obj in self._txns.get(txn, ()):
+ self._objcommit(obj, txn)
+ self.importHook(txn) # hook for ExportImport
+
+ # The tpc_finish() of TmpStore returns an UndoInfo object.
+ undo = self._storage.tpc_finish(txn)
+ self._storage._created = self._created
+ self._created = Set()
+ return Rollback(self, undo)
def _flush_invalidations(self):
self._inv_lock.acquire()
@@ -286,7 +345,49 @@
# Now is a good time to collect some garbage
self._cache.incrgc()
- def objcommit(self, object, transaction):
+ def _handle_serial(self, store_return, oid=None, change=True):
+ """Handle the returns from store() and tpc_vote() calls."""
+
+ # These calls can return different types depending on whether
+ # ZEO is used. ZEO uses asynchronous returns that may be
+ # returned in batches by the ClientStorage. ZEO1 can also
+ # return an exception object and expect that the Connection
+ # will raise the exception.
+
+ # When _commit_sub() exceutes a store, there is no need to
+ # update the _p_changed flag, because the subtransaction
+ # tpc_vote() calls already did this. The change=1 argument
+ # exists to allow _commit_sub() to avoid setting the flag
+ # again.
+ if not store_return:
+ return
+ if isinstance(store_return, StringType):
+ assert oid is not None
+ serial = store_return
+ obj = self._cache.get(oid, None)
+ if obj is None:
+ return
+ if serial == ResolvedSerial:
+ obj._p_changed = None
+ else:
+ if change:
+ obj._p_changed = 0
+ obj._p_serial = serial
+ else:
+ for oid, serial in store_return:
+ if not isinstance(serial, StringType):
+ raise serial
+ obj = self._cache.get(oid, None)
+ if obj is None:
+ continue
+ if serial == ResolvedSerial:
+ obj._p_changed = None
+ else:
+ if change:
+ obj._p_changed = 0
+ obj._p_serial = serial
+
+ def _objcommit(self, object, transaction):
oid = object._p_oid
self._logger.debug("commit object %s", u64(oid))
@@ -305,12 +406,9 @@
writer = ObjectWriter(self)
for obj in writer.newObjects(object):
- self.commit_store(writer, obj, transaction)
+ self._commit_store(writer, obj, transaction)
- def commit_store(self, writer, pobject, transaction):
- # XXX the file and pickler get reset each time through...
- # Maybe just create new ones each time? Except I'm not sure
- # how that interacts with the persistent_id attribute.
+ def _commit_store(self, writer, pobject, transaction):
oid = pobject._p_oid
serial = getattr(pobject, '_p_serial', None)
if serial is None:
@@ -330,8 +428,8 @@
self._cache[oid] = pobject
self._handle_serial(s, oid)
- def commit_sub(self, txn):
- """Commit all work done in subtransactions"""
+ def _commit_sub(self, txn):
+ # Commit all work done in subtransactions.
assert self._tmp is not None
tmp = self._storage
@@ -347,10 +445,10 @@
for oid in tmp._index:
data, serial = tmp.load(oid, tmp._bver)
s = self._storage.store(oid, serial, data, self._version, txn)
- self._handle_serial(s, oid, change=0)
+ self._handle_serial(s, oid, change=False)
- def abort_sub(self):
- """Abort work done in subtransactions"""
+ def _abort_sub(self):
+ # Abort work done in subtransactions.
assert self._tmp is not None
tmp = self._storage
@@ -361,7 +459,7 @@
self._invalidate_created(tmp._created)
def _invalidate_created(self, created):
- """Dis-own new objects from uncommitted transaction."""
+ # Dis-own new objects from uncommitted transaction.
for oid in created:
o = self._cache.get(oid)
if o is not None:
@@ -369,144 +467,15 @@
del o._p_oid
del self._cache[oid]
- ######################################################################
- # Transaction.IDataManager
-
- def oldstate(self, object, serial):
- """Return the state of an object as of serial.
-
- This routine is used by Zope's History facility.
- """
- p = self._storage.loadSerial(object._p_oid, serial)
- return self._reader.getState(p)
-
- def savepoint(self, txn):
- if self._tmp is None:
- tmp = TmpStore(self._version)
- self._tmp = self._storage
- self._storage = tmp
- tmp.registerDB(self._db)
- self._modified = Set()
- self._created = Set()
- self._storage.tpc_begin(txn)
-
- for obj in self._txns.get(txn, ()):
- self.objcommit(obj, txn)
- self.importHook(txn) # hook for ExportImport
-
- # The tpc_finish() of TmpStore returns an UndoInfo object.
- undo = self._storage.tpc_finish(txn)
- self._storage._created = self._created
- self._created = Set()
- return Rollback(self, undo)
-
- def abort(self, txn):
- # XXX need test to make sure it is safe to call abort()
- # without calling prepare()
- if self._tmp is not None:
- self.abort_sub()
- self._storage.tpc_abort(txn)
-
- objs = self._txns.get(txn)
- if objs is not None:
- self._cache.invalidateMany([obj._p_oid for obj in objs])
- del self._txns[txn]
- self._flush_invalidations()
- self._cache.invalidateMany(self._modified)
- self._invalidate_created(self._created)
- self._created = Set()
- self._modified.clear()
-
- def prepare(self, txn):
- self._modified.clear()
- self._created.clear()
- if self._tmp is not None:
- # commit_sub() will call tpc_begin() on the real storage
- self.commit_sub(txn)
- else:
- self._storage.tpc_begin(txn)
-
- for obj in self._txns.get(txn, ()):
- self.objcommit(obj, txn)
-
- s = self._storage.tpc_vote(txn)
- self._handle_serial(s)
- return True
-
- def _handle_serial(self, store_return, oid=None, change=1):
- """Handle the returns from store() and tpc_vote() calls."""
-
- # These calls can return different types depending on whether
- # ZEO is used. ZEO uses asynchronous returns that may be
- # returned in batches by the ClientStorage. ZEO1 can also
- # return an exception object and expect that the Connection
- # will raise the exception.
-
- # When commit_sub() exceutes a store, there is no need to
- # update the _p_changed flag, because the subtransaction
- # tpc_vote() calls already did this. The change=1 argument
- # exists to allow commit_sub() to avoid setting the flag
- # again.
- if not store_return:
- return
- if isinstance(store_return, StringType):
- assert oid is not None
- serial = store_return
- obj = self._cache.get(oid, None)
- if obj is None:
- return
- if serial == ResolvedSerial:
- obj._p_changed = None
- else:
- if change:
- obj._p_changed = 0
- obj._p_serial = serial
- else:
- for oid, serial in store_return:
- if not isinstance(serial, StringType):
- raise serial
- obj = self._cache.get(oid, None)
- if obj is None:
- continue
- if serial == ResolvedSerial:
- obj._p_changed = None
- else:
- if change:
- obj._p_changed = 0
- obj._p_serial = serial
-
- def commit(self, txn):
- # It's important that the storage call the function we pass
- # (self._invalidate_modified) while it still has its
- # lock. We don't want another thread to be able to read any
- # updated data until we've had a chance to send an
- # invalidation message to all of the other connections!
-
- self._db.begin_invalidation()
- # XXX We should really have a try/finally because the begin
- # call acquired a lock that will only be released in
- # _invalidate_modified().
- self._storage.tpc_finish(txn, self._invalidate_modified)
- try:
- del self._txns[txn]
- except KeyError:
- pass
-
- self._flush_invalidations()
-
def _invalidate_modified(self):
+ # Called from the storage's tpc_finish() method after
+ # self._db.begin_invalidation() is called. The begin_
+ # and finish_invalidation() methods acquire and release
+ # a lock.
for oid in self._modified:
self._db.invalidate(oid, self)
self._db.finish_invalidation()
- def sync(self):
- # XXX Is it safe to abort things right now?
- get_transaction().abort()
- sync = getattr(self._storage, 'sync', None)
- if sync is not None:
- sync()
- self._flush_invalidations()
-
class Rollback:
"""Rollback changes associated with savepoint"""
@@ -517,6 +486,8 @@
# XXX Should it be possible to rollback() to the same savepoint
# more than once?
+ __implements__ = transaction.interfaces.IRollback
+
def __init__(self, conn, tmp_undo):
self._conn = conn
self._tmp_undo = tmp_undo # undo info from the storage
@@ -642,6 +613,11 @@
self._tindex.clear()
class UndoInfo:
+ """A helper class for rollback.
+
+ The class stores the state necessary for rolling back to a
+ particular time.
+ """
def __init__(self, store, pos, index):
self._store = store
=== Zope3/src/zodb/db.py 1.4 => 1.5 ===
--- Zope3/src/zodb/db.py:1.4 Fri Jan 24 18:20:58 2003
+++ Zope3/src/zodb/db.py Tue Jan 28 13:17:30 2003
@@ -130,15 +130,6 @@
def abortVersion(self, version):
AbortVersion(self, version)
- # XXX I don't think the cache should be used via _cache.
- # Not sure that both full sweep and minimize need to stay.
-
- def cacheFullSweep(self):
- self._connectionMap(lambda c: c._cache.full_sweep())
-
- def cacheMinimize(self):
- self._connectionMap(lambda c: c._cache.minimize())
-
def close(self):
self._storage.close()
=== Zope3/src/zodb/interfaces.py 1.5 => 1.6 ===
--- Zope3/src/zodb/interfaces.py:1.5 Tue Jan 28 11:42:25 2003
+++ Zope3/src/zodb/interfaces.py Tue Jan 28 13:17:30 2003
@@ -237,6 +237,12 @@
def root():
"""Return the root of the database."""
+ def sync():
+ """Process pending invalidations.
+
+ If there is a current transaction, it will be aborted.
+ """
+
class IConnection(Interface):
"""Interface required of Connection by ZODB DB.