[Zope3-checkins] CVS: Zope3/src/zodb - connection.py:1.9.2.2
Jeremy Hylton
jeremy@zope.com
Mon, 3 Mar 2003 17:16:50 -0500
Update of /cvs-repository/Zope3/src/zodb
In directory cvs.zope.org:/tmp/cvs-serv1511
Modified Files:
Tag: jeremy-atomic-invalidation-branch
connection.py
Log Message:
Several improvements for invalidation locking.
Add comment explaining why we need to use a lock even though
dict update() should be atomic.
Remove modifiedInVersion() which is not used anywhere.
Refactor setstate() to do invalidation checking in helper methods.
Remove duplicate checking of _invalidation. It was checked once in
_objcommit() and once in _commit_store(). Every object checked in
_objcommit() was also checked in _commit_store(), so move the check
there.
=== Zope3/src/zodb/connection.py 1.9.2.1 => 1.9.2.2 ===
--- Zope3/src/zodb/connection.py:1.9.2.1 Sat Mar 1 21:36:54 2003
+++ Zope3/src/zodb/connection.py Mon Mar 3 17:16:49 2003
@@ -92,7 +92,16 @@
# _invalidated queues invalidate messages delivered from the DB
# _inv_lock prevents one thread from modifying the set while
- # another is processing invalidations
+ # another is processing invalidations. All the invalidations
+ # from a single transaction should be applied atomically, so
+ # the lock must be held when reading _invalidated.
+
+ # XXX It sucks that we have to hold the lock to read
+ # _invalidated. Normally, _invalidated is written by call
+ # dict.update, which will execute atomically by virtue of the
+ # GIL. But some storage might generate oids where hash or
+ # compare invokes Python code. In that case, the GIL can't
+ # save us.
self._inv_lock = threading.Lock()
self._invalidated = Set()
self._committed = []
@@ -125,12 +134,6 @@
sync()
self._flush_invalidations()
- def modifiedInVersion(self, oid):
- try:
- return self._storage.modifiedInVersion(oid)
- except KeyError:
- return self._version
-
def get(self, oid):
# assume that a cache cannot store None as a valid object
object = self._cache.get(oid)
@@ -158,13 +161,10 @@
# setstate(), register(), mtime()
def setstate(self, obj):
+ # extremely paranoid: guard against obj not having an _p_oid.
oid = None
- # XXX Is it possible to reorganize the method-level try/except?
try:
oid = obj._p_oid
-
- # XXX this is quite conservative!
-
# Avoid reading data from a transaction that committed
# after the current transaction started, as that might
# lead to mixing of cached data from earlier transactions
@@ -173,34 +173,11 @@
# Wait for check until after data is loaded from storage
# to avoid time-of-check to time-of-use race.
p, serial = self._storage.load(oid, self._version)
-
- if oid in self._invalidated:
- if not (hasattr(obj, '_p_independent')
- and obj._p_independent()):
- get_transaction().join(self)
- self._conflicts.add(obj._p_oid)
- raise ReadConflictError(object=obj)
- invalid = 1
- else:
- invalid = 0
-
+ invalid = self._is_invalidated(obj)
self._reader.setGhostState(obj, p)
obj._p_serial = serial
-
if invalid:
- if obj._p_independent():
- self._inv_lock.acquire()
- try:
- try:
- del self._invalidated[oid]
- except KeyError:
- pass
- finally:
- self._inv_lock.release()
- else:
- get_transaction().join(self)
- raise ConflictError(object=obj)
-
+ self._handle_independent(obj)
except ConflictError:
raise
except:
@@ -210,6 +187,44 @@
# Add the object to the cache active list
self._cache.setstate(oid, obj)
+ def _is_invalidated(self, obj):
+ # Helper method for setstate() covers three cases:
+ # returns false if obj is valid
+ # returns true if obj was invalidation, but is independent
+ # otherwise, raises ConflictError for invalidated objects
+ self._inv_lock.acquire()
+ try:
+ if obj._p_oid in self._invalidated:
+ # Defer _p_independent() call until state is loaded.
+ if hasattr(obj, "_p_independent"):
+ return True
+ else:
+ get_transaction().join(self)
+ self._conflicts.add(obj._p_oid)
+ raise ReadConflictError(object=obj)
+ else:
+ return False
+ finally:
+ self._inv_lock.release()
+
+ def _handle_independent(self, obj):
+ # Helper method for setstate() handles possibly independent objects
+ # Call _p_independent(), if it returns True, setstate() wins.
+ # Otherwise, raise a ConflictError.
+
+ if obj._p_independent():
+ self._inv_lock.acquire()
+ try:
+ try:
+ self._invalidated.remove(obj._p_oid)
+ except KeyError:
+ pass
+ finally:
+ self._inv_lock.release()
+ else:
+ get_transaction().join(self)
+ raise ReadConflictError(object=obj)
+
def register(self, object):
txn = get_transaction()
L = self._txns.get(txn)
@@ -223,7 +238,7 @@
return None
######################################################################
- # IConnection requires the next three methods:
+ # IConnection requires the next five methods:
# getVersion(), reset(), cacheGC(), invalidate(), close()
def getVersion(self):
@@ -236,10 +251,12 @@
# version.
self._cache.clear()
self._version = version
- # This method doesn't acquire the lock, so it shouldn't be called
- # when the DB is delivering invalidations.
- self._cache.invalidateMany(self._invalidated)
- self._invalidated.clear()
+ self._inv_lock.acquire()
+ try:
+ self._cache.invalidateMany(self._invalidated)
+ self._invalidated.clear()
+ finally:
+ self._inv_lock.release()
self._opened = time.time()
def cacheGC(self):
@@ -252,6 +269,7 @@
it. The object data will be actually invalidated at certain
transaction boundaries.
"""
+
self._inv_lock.acquire()
try:
self._invalidated.update(oids)
@@ -420,9 +438,6 @@
object._p_oid = oid
self._created.add(oid)
elif object._p_changed:
- if (oid in self._invalidated and
- not hasattr(object, '_p_resolveConflict')):
- raise ConflictError(object=object)
self._modified.add(oid)
else:
return # Nothing to do
@@ -437,10 +452,13 @@
if serial is None:
self._created.add(oid)
else:
- # XXX this seems to duplicate code on objcommit()
- if (oid in self._invalidated and
- not hasattr(pobject, '_p_resolveConflict')):
- raise ConflictError(oid=oid)
+ self._inv_lock.acquire()
+ try:
+ if (oid in self._invalidated and
+ not hasattr(pobject, '_p_resolveConflict')):
+ raise ConflictError(oid=oid)
+ finally:
+ self._inv_lock.release()
self._modified.add(oid)
s = self._storage.store(oid, serial, writer.getState(pobject),