[Zope3-checkins] CVS: Zope3/src/zodb - utils.py:1.3 interfaces.py:1.11 db.py:1.10 connection.py:1.11
Jeremy Hylton
jeremy@zope.com
Wed, 5 Mar 2003 17:14:31 -0500
Update of /cvs-repository/Zope3/src/zodb
In directory cvs.zope.org:/tmp/cvs-serv18303/zodb
Modified Files:
utils.py interfaces.py db.py connection.py
Log Message:
Merge jeremy-atomic-invalidation-branch.
Change invalidation() APIs to pass a set of oids rather than an
individual oid. All the oids in one call must be processed
atomically.
=== Zope3/src/zodb/utils.py 1.2 => 1.3 ===
--- Zope3/src/zodb/utils.py:1.2 Wed Dec 25 09:12:16 2002
+++ Zope3/src/zodb/utils.py Wed Mar 5 17:14:30 2003
@@ -45,10 +45,21 @@
# This must be Python 2.2, which doesn't have a standard sets module.
# ZODB needs only a very limited subset of the Set API.
class Set(dict):
+ def __init__(self, arg=None):
+ if arg:
+ if isinstance(arg, dict):
+ self.update(arg)
+ else:
+ # XXX the proper sets version is much more robust
+ for o in arg:
+ self[o] = 1
def add(self, o):
self[o] = 1
+ def remove(self, o):
+ del self[o]
def __ior__(self, other):
if not isinstance(other, Set):
return NotImplemented
self.update(other)
return self
+
=== Zope3/src/zodb/interfaces.py 1.10 => 1.11 ===
--- Zope3/src/zodb/interfaces.py:1.10 Thu Feb 27 15:11:47 2003
+++ Zope3/src/zodb/interfaces.py Wed Mar 5 17:14:30 2003
@@ -256,22 +256,19 @@
If there is a current transaction, it will be aborted.
"""
-class IDatabase(Interface):
- """Interface between the database and its connections."""
+ def get(oid):
+ """Return object for `oid`.
- # XXX This interface needs to be redesigned, so I'm not going to
- # bother documenting the current interface.
-
- def begin_invalidation():
- pass
+ The object may be a ghost.
+ """
- def invalidate(oid, conn):
- pass
+class IDatabase(Interface):
+ """Interface between the database and its connections."""
- def finish_invalidation():
+ def invalidate(oids, conn=None, version=""):
pass
- def _closeConnection():
+ def connectionClose(conn):
pass
=== Zope3/src/zodb/db.py 1.9 => 1.10 ===
--- Zope3/src/zodb/db.py:1.9 Thu Feb 27 15:16:45 2003
+++ Zope3/src/zodb/db.py Wed Mar 5 17:14:30 2003
@@ -28,7 +28,7 @@
from zodb.connection import Connection
from zodb.serialize import getDBRoot
from zodb.ztransaction import Transaction
-from zodb.utils import z64
+from zodb.utils import z64, Set
from transaction import get_transaction
from transaction.interfaces import IDataManager
@@ -55,10 +55,12 @@
self.log = logging.getLogger("zodb")
- # Allocate locks:
- l=Lock()
- self._a=l.acquire
- self._r=l.release
+ # The lock protects access to the pool data structures.
+ # Store the lock acquire and release methods as methods
+ # of the instance.
+ l = Lock()
+ self._a = l.acquire
+ self._r = l.release
# Setup connection pools and cache info
# _pool is currently available (closed) connections
@@ -148,17 +150,7 @@
def getPoolSize(self):
return self._pool_size
- def begin_invalidation(self):
- # Must be called before first call to invalidate and before
- # the storage lock is held.
- self._a()
-
- def finish_invalidation(self):
- # Must be called after begin_invalidation() and after final
- # invalidate() call.
- self._r()
-
- def invalidate(self, oid, connection=None, version=''):
+ def invalidate(self, oids, connection=None, version=''):
"""Invalidate references to a given oid.
This is used to indicate that one of the connections has committed a
@@ -166,7 +158,6 @@
passed in to prevent useless (but harmless) messages to the
connection.
"""
- assert oid is not None
if connection is not None:
assert version == connection._version
version = connection._version
@@ -174,18 +165,18 @@
# Notify connections
for cc in self._allocated:
if cc is not connection:
- self.invalidateConnection(cc, oid, version)
+ self.invalidateConnection(cc, oids, version)
if self._temps:
# t accumulates all the connections that aren't closed.
t = []
for cc in self._temps:
if cc is not connection:
- self.invalidateConnection(cc, oid, version,
+ self.invalidateConnection(cc, oids, version,
t.append)
self._temps = t
- def invalidateConnection(self, conn, oid, version, alive=None):
+ def invalidateConnection(self, conn, oids, version, alive=None):
"""Send invalidation message to conn for oid on version.
If the modification occurred on a version, an invalidation is
@@ -205,7 +196,7 @@
if alive is not None:
alive(conn)
if not version or conn.getVersion() == version:
- conn.invalidate(oid)
+ conn.invalidate(oids)
def open(self, version='', transaction=None, temporary=0, force=None,
waitflag=1):
@@ -369,18 +360,16 @@
self._dest = dest
def _prepare(self, txn):
- self._oids = self._storage.commitVersion(self._version, self._dest,
- txn)
+ self._oids = Set(self._storage.commitVersion(self._version, self._dest,
+ txn))
def commit(self, txn):
super(CommitVersion, self).commit(txn)
- for oid in self._oids:
- self._db.invalidate(oid, version=self._dest)
+ self._db.invalidate(self._oids, version=self._dest)
if self._dest:
# the code above just invalidated the dest version.
# now we need to invalidate the source!
- for oid in self._oids:
- self._db.invalidate(oid, version=self._version)
+ self._db.invalidate(self._oids, version=self._version)
class AbortVersion(SimpleDataManager):
"""An object that will see to version abortion."""
@@ -390,12 +379,11 @@
self._version = version
def _prepare(self, txn):
- self._oids = self._storage.abortVersion(self._version, txn)
+ self._oids = Set(self._storage.abortVersion(self._version, txn))
def commit(self, txn):
super(AbortVersion, self).commit(txn)
- for oid in self._oids:
- self._db.invalidate(oid, version=self._version)
+ self._db.invalidate(self._oids, version=self._version)
class TransactionalUndo(SimpleDataManager):
"""An object that will see to transactional undo."""
@@ -405,9 +393,8 @@
self._tid = tid
def _prepare(self, txn):
- self._oids = self._storage.undo(self._tid, txn)
+ self._oids = Set(self._storage.undo(self._tid, txn))
def commit(self, txn):
super(TransactionalUndo, self).commit(txn)
- for oid in self._oids:
- self._db.invalidate(oid)
+ self._db.invalidate(self._oids)
=== Zope3/src/zodb/connection.py 1.10 => 1.11 ===
--- Zope3/src/zodb/connection.py:1.10 Wed Mar 5 15:24:38 2003
+++ Zope3/src/zodb/connection.py Wed Mar 5 17:14:30 2003
@@ -49,7 +49,7 @@
from zodb import interfaces
from zodb.conflict import ResolvedSerial
from zodb.export import ExportImport
-from zodb.interfaces import IConnection, ConflictError, IAppConnection
+from zodb.interfaces import *
from zodb.serialize import ConnectionObjectReader, ObjectWriter
from zodb.utils import p64, u64, Set, z64
@@ -88,7 +88,16 @@
# _invalidated queues invalidate messages delivered from the DB
# _inv_lock prevents one thread from modifying the set while
- # another is processing invalidations
+ # another is processing invalidations. All the invalidations
+ # from a single transaction should be applied atomically, so
+ # the lock must be held when reading _invalidated.
+
+ # XXX It sucks that we have to hold the lock to read
+ # _invalidated. Normally, _invalidated is written by call
+ # dict.update, which will execute atomically by virtue of the
+ # GIL. But some storage might generate oids where hash or
+ # compare invokes Python code. In that case, the GIL can't
+ # save us.
self._inv_lock = threading.Lock()
self._invalidated = Set()
self._committed = []
@@ -100,6 +109,11 @@
self._registered = Set()
self._modified = Set() # XXX is this the same as registered?
self._created = Set()
+ # _conflicts: set of objects that failed to load because
+ # of read conflicts. We must track these explicitly
+ # because they occur outside the two-phase commit and
+ # we must not allow the transaction they occur in to commit.
+ self._conflicts = Set()
# new_oid is used by serialize
self.newObjectId = self._storage.newObjectId
@@ -153,13 +167,10 @@
# setstate(), register(), mtime()
def setstate(self, obj):
+ # extremely paranoid: guard against obj not having an _p_oid.
oid = None
- # XXX Is it possible to reorganize the method-level try/except?
try:
oid = obj._p_oid
-
- # XXX this is quite conservative!
-
# Avoid reading data from a transaction that committed
# after the current transaction started, as that might
# lead to mixing of cached data from earlier transactions
@@ -168,33 +179,11 @@
# Wait for check until after data is loaded from storage
# to avoid time-of-check to time-of-use race.
p, serial = self._storage.load(oid, self._version)
-
- if oid in self._invalidated:
- if not (hasattr(obj, '_p_independent')
- and obj._p_independent()):
- self._get_transaction().join(self)
- raise ConflictError(object=obj)
- invalid = 1
- else:
- invalid = 0
-
+ invalid = self._is_invalidated(obj)
self._reader.setGhostState(obj, p)
obj._p_serial = serial
-
if invalid:
- if obj._p_independent():
- self._inv_lock.acquire()
- try:
- try:
- del self._invalidated[oid]
- except KeyError:
- pass
- finally:
- self._inv_lock.release()
- else:
- self._get_transaction().join(self)
- raise ConflictError(object=obj)
-
+ self._handle_independent(obj)
except ConflictError:
raise
except:
@@ -204,6 +193,44 @@
# Add the object to the cache active list
self._cache.setstate(oid, obj)
+ def _is_invalidated(self, obj):
+ # Helper method for setstate() covers three cases:
+ # returns false if obj is valid
+ # returns true if obj was invalidation, but is independent
+ # otherwise, raises ConflictError for invalidated objects
+ self._inv_lock.acquire()
+ try:
+ if obj._p_oid in self._invalidated:
+ # Defer _p_independent() call until state is loaded.
+ if hasattr(obj, "_p_independent"):
+ return True
+ else:
+ self._get_transaction().join(self)
+ self._conflicts.add(obj._p_oid)
+ raise ReadConflictError(object=obj)
+ else:
+ return False
+ finally:
+ self._inv_lock.release()
+
+ def _handle_independent(self, obj):
+ # Helper method for setstate() handles possibly independent objects
+ # Call _p_independent(), if it returns True, setstate() wins.
+ # Otherwise, raise a ConflictError.
+
+ if obj._p_independent():
+ self._inv_lock.acquire()
+ try:
+ try:
+ self._invalidated.remove(obj._p_oid)
+ except KeyError:
+ pass
+ finally:
+ self._inv_lock.release()
+ else:
+ self._get_transaction().join(self)
+ raise ReadConflictError(object=obj)
+
def register(self, obj):
if not self._registered:
self._get_transaction().join(self)
@@ -214,7 +241,7 @@
return None
######################################################################
- # IConnection requires the next three methods:
+ # IConnection requires the next five methods:
# getVersion(), reset(), cacheGC(), invalidate(), close()
def getVersion(self):
@@ -228,26 +255,28 @@
# version.
self._cache.clear()
self._version = version
- # This method doesn't acquire the lock, so it shouldn't be called
- # when the DB is delivering invalidations.
- self._cache.invalidateMany(self._invalidated)
- self._invalidated.clear()
+ self._inv_lock.acquire()
+ try:
+ self._cache.invalidateMany(self._invalidated)
+ self._invalidated.clear()
+ finally:
+ self._inv_lock.release()
self._open = True
def cacheGC(self):
self._cache.incrgc()
- def invalidate(self, oid):
+ def invalidate(self, oids):
"""Invalidate a particular oid
This marks the oid as invalid, but doesn't actually invalidate
it. The object data will be actually invalidated at certain
transaction boundaries.
"""
- assert oid is not None
+
self._inv_lock.acquire()
try:
- self._invalidated.add(oid)
+ self._invalidated.update(oids)
finally:
self._inv_lock.release()
@@ -271,6 +300,12 @@
# prepare(), abort(), commit(), savepoint()
def prepare(self, txn):
+ if self._conflicts:
+ # XXX should raise all of the conflicting oids, but
+ # haven't gotten around to changing the exception
+ # to store them.
+ oid = list(self._conflicts)[0]
+ raise ReadConflictError(oid)
self._modified.clear()
self._created.clear()
if self._tmp is not None:
@@ -303,20 +338,21 @@
self._flush_invalidations()
self._created.clear()
self._modified.clear()
+ self._conflicts.clear()
def commit(self, txn):
# It's important that the storage call the function we pass
- # (self._invalidate_modified) while it still has its
- # lock. We don't want another thread to be able to read any
+ # (self._invalidate_modified) while it still has its lock.
+ # We don't want another thread to be able to read any
# updated data until we've had a chance to send an
# invalidation message to all of the other connections!
- self._db.begin_invalidation()
- # XXX We should really have a try/finally because the begin
- # call acquired a lock that will only be released in
- # _invalidate_modified().
+ # If another thread could read the newly committed data
+ # before the invalidation is delivered, the connection would
+ # not be able to detect a read conflict.
self._storage.tpcFinish(txn, self._invalidate_modified)
self._txn = None
+ self._conflicts.clear()
self._flush_invalidations()
self._registered.clear()
self._created.clear()
@@ -342,6 +378,18 @@
self._created = Set()
return Rollback(self, undo)
+ def _invalidate_created(self, created):
+ # Dis-own new objects from uncommitted transaction.
+ for oid in created:
+ o = self._cache.get(oid)
+ if o is not None:
+ del o._p_jar
+ del o._p_oid
+ del self._cache[oid]
+
+ def _invalidate_modified(self):
+ self._db.invalidate(self._modified, self)
+
def _flush_invalidations(self):
self._inv_lock.acquire()
try:
@@ -404,9 +452,6 @@
object._p_oid = oid
self._created.add(oid)
elif object._p_changed:
- if (oid in self._invalidated and
- not hasattr(object, '_p_resolveConflict')):
- raise ConflictError(object=object)
self._modified.add(oid)
else:
return # Nothing to do
@@ -421,10 +466,13 @@
if serial is None:
self._created.add(oid)
else:
- # XXX this seems to duplicate code on objcommit()
- if (oid in self._invalidated and
- not hasattr(pobject, '_p_resolveConflict')):
- raise ConflictError(oid=oid)
+ self._inv_lock.acquire()
+ try:
+ if (oid in self._invalidated and
+ not hasattr(pobject, '_p_resolveConflict')):
+ raise ConflictError(oid=oid)
+ finally:
+ self._inv_lock.release()
self._modified.add(oid)
s = self._storage.store(oid, serial, writer.getState(pobject),
@@ -464,24 +512,6 @@
self._cache.invalidateMany(tmp._index)
self._invalidate_created(tmp._created)
-
- def _invalidate_created(self, created):
- # Dis-own new objects from uncommitted transaction.
- for oid in created:
- o = self._cache.get(oid)
- if o is not None:
- del o._p_jar
- del o._p_oid
- del self._cache[oid]
-
- def _invalidate_modified(self):
- # Called from the storage's tpc_finish() method after
- # self._db.begin_invalidation() is called. The begin_
- # and finish_invalidation() methods acquire and release
- # a lock.
- for oid in self._modified:
- self._db.invalidate(oid, self)
- self._db.finish_invalidation()
class Rollback:
"""Rollback changes associated with savepoint"""